Bladeren bron

Update dependencies

世界 2 jaren geleden
bovenliggende
commit
18e5b0963f

+ 19 - 21
app/dispatcher/default.go

@@ -342,29 +342,27 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
 	}
 	sniffingRequest := content.SniffingRequest
 	if !sniffingRequest.Enabled {
-		go d.routedDispatch(ctx, outbound, destination)
+		d.routedDispatch(ctx, outbound, destination)
 	} else {
-		go func() {
-			cReader := &cachedReader{
-				reader: outbound.Reader.(*pipe.Reader),
-			}
-			outbound.Reader = cReader
-			result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network)
-			if err == nil {
-				content.Protocol = result.Protocol()
-			}
-			if err == nil && d.shouldOverride(ctx, result, sniffingRequest, destination) {
-				domain := result.Domain()
-				newError("sniffed domain: ", domain).WriteToLog(session.ExportIDToError(ctx))
-				destination.Address = net.ParseAddress(domain)
-				if sniffingRequest.RouteOnly && result.Protocol() != "fakedns" {
-					ob.RouteTarget = destination
-				} else {
-					ob.Target = destination
-				}
+		cReader := &cachedReader{
+			reader: outbound.Reader.(*pipe.Reader),
+		}
+		outbound.Reader = cReader
+		result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network)
+		if err == nil {
+			content.Protocol = result.Protocol()
+		}
+		if err == nil && d.shouldOverride(ctx, result, sniffingRequest, destination) {
+			domain := result.Domain()
+			newError("sniffed domain: ", domain).WriteToLog(session.ExportIDToError(ctx))
+			destination.Address = net.ParseAddress(domain)
+			if sniffingRequest.RouteOnly && result.Protocol() != "fakedns" {
+				ob.RouteTarget = destination
+			} else {
+				ob.Target = destination
 			}
-			d.routedDispatch(ctx, outbound, destination)
-		}()
+		}
+		d.routedDispatch(ctx, outbound, destination)
 	}
 
 	return nil

+ 1 - 1
app/proxyman/outbound/handler.go

@@ -211,7 +211,7 @@ out:
 		err.WriteToLog(session.ExportIDToError(ctx))
 		common.Interrupt(link.Writer)
 	} else {
-		common.Must(common.Close(link.Writer))
+		common.Close(link.Writer)
 	}
 	common.Interrupt(link.Reader)
 }

+ 46 - 0
common/singbridge/destination.go

@@ -0,0 +1,46 @@
+package singbridge
+
+import (
+	M "github.com/sagernet/sing/common/metadata"
+	N "github.com/sagernet/sing/common/network"
+	"github.com/xtls/xray-core/common/net"
+)
+
+func ToNetwork(network string) net.Network {
+	switch N.NetworkName(network) {
+	case N.NetworkTCP:
+		return net.Network_TCP
+	case N.NetworkUDP:
+		return net.Network_UDP
+	default:
+		return net.Network_Unknown
+	}
+}
+
+func ToDestination(socksaddr M.Socksaddr, network net.Network) net.Destination {
+	if socksaddr.IsFqdn() {
+		return net.Destination{
+			Network: network,
+			Address: net.DomainAddress(socksaddr.Fqdn),
+			Port:    net.Port(socksaddr.Port),
+		}
+	} else {
+		return net.Destination{
+			Network: network,
+			Address: net.IPAddress(socksaddr.Addr.AsSlice()),
+			Port:    net.Port(socksaddr.Port),
+		}
+	}
+}
+
+func ToSocksaddr(destination net.Destination) M.Socksaddr {
+	var addr M.Socksaddr
+	switch destination.Address.Family() {
+	case net.AddressFamilyDomain:
+		addr.Fqdn = destination.Address.Domain()
+	default:
+		addr.Addr = M.AddrFromIP(destination.Address.IP())
+	}
+	addr.Port = uint16(destination.Port)
+	return addr
+}

+ 59 - 0
common/singbridge/dialer.go

@@ -0,0 +1,59 @@
+package singbridge
+
+import (
+	"context"
+	"os"
+
+	M "github.com/sagernet/sing/common/metadata"
+	N "github.com/sagernet/sing/common/network"
+	"github.com/xtls/xray-core/common/net"
+	"github.com/xtls/xray-core/common/net/cnc"
+	"github.com/xtls/xray-core/common/session"
+	"github.com/xtls/xray-core/proxy"
+	"github.com/xtls/xray-core/transport"
+	"github.com/xtls/xray-core/transport/internet"
+	"github.com/xtls/xray-core/transport/pipe"
+)
+
+var _ N.Dialer = (*XrayDialer)(nil)
+
+type XrayDialer struct {
+	internet.Dialer
+}
+
+func NewDialer(dialer internet.Dialer) *XrayDialer {
+	return &XrayDialer{dialer}
+}
+
+func (d *XrayDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) {
+	return d.Dialer.Dial(ctx, ToDestination(destination, ToNetwork(network)))
+}
+
+func (d *XrayDialer) ListenPacket(ctx context.Context, destination M.Socksaddr) (net.PacketConn, error) {
+	return nil, os.ErrInvalid
+}
+
+type XrayOutboundDialer struct {
+	outbound proxy.Outbound
+	dialer   internet.Dialer
+}
+
+func NewOutboundDialer(outbound proxy.Outbound, dialer internet.Dialer) *XrayOutboundDialer {
+	return &XrayOutboundDialer{outbound, dialer}
+}
+
+func (d *XrayOutboundDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) {
+	ctx = session.ContextWithOutbound(context.Background(), &session.Outbound{
+		Target: ToDestination(destination, ToNetwork(network)),
+	})
+	opts := []pipe.Option{pipe.WithSizeLimit(64 * 1024)}
+	uplinkReader, uplinkWriter := pipe.New(opts...)
+	downlinkReader, downlinkWriter := pipe.New(opts...)
+	conn := cnc.NewConnection(cnc.ConnectionInputMulti(downlinkWriter), cnc.ConnectionOutputMulti(uplinkReader))
+	go d.outbound.Process(ctx, &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}, d.dialer)
+	return conn, nil
+}
+
+func (d *XrayOutboundDialer) ListenPacket(ctx context.Context, destination M.Socksaddr) (net.PacketConn, error) {
+	return nil, os.ErrInvalid
+}

+ 10 - 0
common/singbridge/error.go

@@ -0,0 +1,10 @@
+package singbridge
+
+import E "github.com/sagernet/sing/common/exceptions"
+
+func ReturnError(err error) error {
+	if E.IsClosedOrCanceled(err) {
+		return nil
+	}
+	return err
+}

+ 51 - 0
common/singbridge/handler.go

@@ -0,0 +1,51 @@
+package singbridge
+
+import (
+	"context"
+	"io"
+
+	M "github.com/sagernet/sing/common/metadata"
+	N "github.com/sagernet/sing/common/network"
+	"github.com/xtls/xray-core/common/buf"
+	"github.com/xtls/xray-core/common/errors"
+	"github.com/xtls/xray-core/common/net"
+	"github.com/xtls/xray-core/common/session"
+	"github.com/xtls/xray-core/features/routing"
+	"github.com/xtls/xray-core/transport"
+)
+
+var (
+	_ N.TCPConnectionHandler = (*Dispatcher)(nil)
+	_ N.UDPConnectionHandler = (*Dispatcher)(nil)
+)
+
+type Dispatcher struct {
+	upstream     routing.Dispatcher
+	newErrorFunc func(values ...any) *errors.Error
+}
+
+func NewDispatcher(dispatcher routing.Dispatcher, newErrorFunc func(values ...any) *errors.Error) *Dispatcher {
+	return &Dispatcher{
+		upstream:     dispatcher,
+		newErrorFunc: newErrorFunc,
+	}
+}
+
+func (d *Dispatcher) NewConnection(ctx context.Context, conn net.Conn, metadata M.Metadata) error {
+	xConn := NewConn(conn)
+	return d.upstream.DispatchLink(ctx, ToDestination(metadata.Destination, net.Network_TCP), &transport.Link{
+		Reader: xConn,
+		Writer: xConn,
+	})
+}
+
+func (d *Dispatcher) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error {
+	return d.upstream.DispatchLink(ctx, ToDestination(metadata.Destination, net.Network_UDP), &transport.Link{
+		Reader: buf.NewPacketReader(conn.(io.Reader)),
+		Writer: buf.NewWriter(conn.(io.Writer)),
+	})
+}
+
+func (d *Dispatcher) NewError(ctx context.Context, err error) {
+	d.newErrorFunc(err).WriteToLog(session.ExportIDToError(ctx))
+}

+ 71 - 0
common/singbridge/logger.go

@@ -0,0 +1,71 @@
+package singbridge
+
+import (
+	"context"
+
+	"github.com/sagernet/sing/common/logger"
+	"github.com/xtls/xray-core/common/errors"
+	"github.com/xtls/xray-core/common/session"
+)
+
+var _ logger.ContextLogger = (*XrayLogger)(nil)
+
+type XrayLogger struct {
+	newError func(values ...any) *errors.Error
+}
+
+func NewLogger(newErrorFunc func(values ...any) *errors.Error) *XrayLogger {
+	return &XrayLogger{
+		newErrorFunc,
+	}
+}
+
+func (l *XrayLogger) Trace(args ...any) {
+}
+
+func (l *XrayLogger) Debug(args ...any) {
+	l.newError(args...).AtDebug().WriteToLog()
+}
+
+func (l *XrayLogger) Info(args ...any) {
+	l.newError(args...).AtInfo().WriteToLog()
+}
+
+func (l *XrayLogger) Warn(args ...any) {
+	l.newError(args...).AtWarning().WriteToLog()
+}
+
+func (l *XrayLogger) Error(args ...any) {
+	l.newError(args...).AtError().WriteToLog()
+}
+
+func (l *XrayLogger) Fatal(args ...any) {
+}
+
+func (l *XrayLogger) Panic(args ...any) {
+}
+
+func (l *XrayLogger) TraceContext(ctx context.Context, args ...any) {
+}
+
+func (l *XrayLogger) DebugContext(ctx context.Context, args ...any) {
+	l.newError(args...).AtDebug().WriteToLog(session.ExportIDToError(ctx))
+}
+
+func (l *XrayLogger) InfoContext(ctx context.Context, args ...any) {
+	l.newError(args...).AtInfo().WriteToLog(session.ExportIDToError(ctx))
+}
+
+func (l *XrayLogger) WarnContext(ctx context.Context, args ...any) {
+	l.newError(args...).AtWarning().WriteToLog(session.ExportIDToError(ctx))
+}
+
+func (l *XrayLogger) ErrorContext(ctx context.Context, args ...any) {
+	l.newError(args...).AtError().WriteToLog(session.ExportIDToError(ctx))
+}
+
+func (l *XrayLogger) FatalContext(ctx context.Context, args ...any) {
+}
+
+func (l *XrayLogger) PanicContext(ctx context.Context, args ...any) {
+}

+ 82 - 0
common/singbridge/packet.go

@@ -0,0 +1,82 @@
+package singbridge
+
+import (
+	"context"
+
+	B "github.com/sagernet/sing/common/buf"
+	"github.com/sagernet/sing/common/bufio"
+	M "github.com/sagernet/sing/common/metadata"
+	"github.com/xtls/xray-core/common/buf"
+	"github.com/xtls/xray-core/common/net"
+	"github.com/xtls/xray-core/transport"
+)
+
+func CopyPacketConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, destination net.Destination, serverConn net.PacketConn) error {
+	conn := &PacketConnWrapper{
+		Reader: link.Reader,
+		Writer: link.Writer,
+		Dest:   destination,
+		Conn:   inboundConn,
+	}
+	return ReturnError(bufio.CopyPacketConn(ctx, conn, bufio.NewPacketConn(serverConn)))
+}
+
+type PacketConnWrapper struct {
+	buf.Reader
+	buf.Writer
+	net.Conn
+	Dest   net.Destination
+	cached buf.MultiBuffer
+}
+
+func (w *PacketConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) {
+	if w.cached != nil {
+		mb, bb := buf.SplitFirst(w.cached)
+		if bb == nil {
+			w.cached = nil
+		} else {
+			buffer.Write(bb.Bytes())
+			w.cached = mb
+			var destination net.Destination
+			if bb.UDP != nil {
+				destination = *bb.UDP
+			} else {
+				destination = w.Dest
+			}
+			bb.Release()
+			return ToSocksaddr(destination), nil
+		}
+	}
+	mb, err := w.ReadMultiBuffer()
+	if err != nil {
+		return M.Socksaddr{}, err
+	}
+	nb, bb := buf.SplitFirst(mb)
+	if bb == nil {
+		return M.Socksaddr{}, nil
+	} else {
+		buffer.Write(bb.Bytes())
+		w.cached = nb
+		var destination net.Destination
+		if bb.UDP != nil {
+			destination = *bb.UDP
+		} else {
+			destination = w.Dest
+		}
+		bb.Release()
+		return ToSocksaddr(destination), nil
+	}
+}
+
+func (w *PacketConnWrapper) WritePacket(buffer *B.Buffer, destination M.Socksaddr) error {
+	vBuf := buf.New()
+	vBuf.Write(buffer.Bytes())
+	endpoint := ToDestination(destination, net.Network_UDP)
+	vBuf.UDP = &endpoint
+	return w.Writer.WriteMultiBuffer(buf.MultiBuffer{vBuf})
+}
+
+func (w *PacketConnWrapper) Close() error {
+	buf.ReleaseMulti(w.cached)
+	return nil
+}

+ 61 - 0
common/singbridge/pipe.go

@@ -0,0 +1,61 @@
+package singbridge
+
+import (
+	"context"
+	"io"
+	"net"
+
+	"github.com/sagernet/sing/common/bufio"
+	"github.com/xtls/xray-core/common/buf"
+	"github.com/xtls/xray-core/transport"
+)
+
+func CopyConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, serverConn net.Conn) error {
+	conn := &PipeConnWrapper{
+		W:    link.Writer,
+		Conn: inboundConn,
+	}
+	if ir, ok := link.Reader.(io.Reader); ok {
+		conn.R = ir
+	} else {
+		conn.R = &buf.BufferedReader{Reader: link.Reader}
+	}
+	return ReturnError(bufio.CopyConn(ctx, conn, serverConn))
+}
+
+type PipeConnWrapper struct {
+	R io.Reader
+	W buf.Writer
+	net.Conn
+}
+
+func (w *PipeConnWrapper) Close() error {
+	return nil
+}
+
+func (w *PipeConnWrapper) Read(b []byte) (n int, err error) {
+	return w.R.Read(b)
+}
+
+func (w *PipeConnWrapper) Write(p []byte) (n int, err error) {
+	n = len(p)
+	var mb buf.MultiBuffer
+	pLen := len(p)
+	for pLen > 0 {
+		buffer := buf.New()
+		if pLen > buf.Size {
+			_, err = buffer.Write(p[:buf.Size])
+			p = p[buf.Size:]
+		} else {
+			buffer.Write(p)
+		}
+		pLen -= int(buffer.Len())
+		mb = append(mb, buffer)
+	}
+	err = w.W.WriteMultiBuffer(mb)
+	if err != nil {
+		n = 0
+		buf.ReleaseMulti(mb)
+	}
+	return
+}

+ 66 - 0
common/singbridge/reader.go

@@ -0,0 +1,66 @@
+package singbridge
+
+import (
+	"time"
+
+	"github.com/sagernet/sing/common"
+	"github.com/sagernet/sing/common/bufio"
+	N "github.com/sagernet/sing/common/network"
+	"github.com/xtls/xray-core/common/buf"
+	"github.com/xtls/xray-core/common/net"
+)
+
+var (
+	_ buf.Reader        = (*Conn)(nil)
+	_ buf.TimeoutReader = (*Conn)(nil)
+	_ buf.Writer        = (*Conn)(nil)
+)
+
+type Conn struct {
+	net.Conn
+	writer N.VectorisedWriter
+}
+
+func NewConn(conn net.Conn) *Conn {
+	writer, _ := bufio.CreateVectorisedWriter(conn)
+	return &Conn{
+		Conn:   conn,
+		writer: writer,
+	}
+}
+
+func (c *Conn) ReadMultiBuffer() (buf.MultiBuffer, error) {
+	buffer, err := buf.ReadBuffer(c.Conn)
+	if err != nil {
+		return nil, err
+	}
+	return buf.MultiBuffer{buffer}, nil
+}
+
+func (c *Conn) ReadMultiBufferTimeout(duration time.Duration) (buf.MultiBuffer, error) {
+	err := c.SetReadDeadline(time.Now().Add(duration))
+	if err != nil {
+		return nil, err
+	}
+	defer c.SetReadDeadline(time.Time{})
+	return c.ReadMultiBuffer()
+}
+
+func (c *Conn) WriteMultiBuffer(bufferList buf.MultiBuffer) error {
+	defer buf.ReleaseMulti(bufferList)
+	if c.writer != nil {
+		bytesList := make([][]byte, len(bufferList))
+		for i, buffer := range bufferList {
+			bytesList[i] = buffer.Bytes()
+		}
+		return common.Error(bufio.WriteVectorised(c.writer, bytesList))
+	}
+	// Since this conn is only used by tun, we don't force buffer writes to merge.
+	for _, buffer := range bufferList {
+		_, err := c.Conn.Write(buffer.Bytes())
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 1 - 1
go.mod

@@ -13,7 +13,7 @@ require (
 	github.com/pires/go-proxyproto v0.7.0
 	github.com/quic-go/quic-go v0.34.0
 	github.com/refraction-networking/utls v1.3.2
-	github.com/sagernet/sing v0.2.3
+	github.com/sagernet/sing v0.2.4
 	github.com/sagernet/sing-shadowsocks v0.2.1
 	github.com/sagernet/wireguard-go v0.0.0-20221116151939-c99467f53f2c
 	github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb

+ 2 - 0
go.sum

@@ -145,6 +145,8 @@ github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3/go.mod h1:HgjTstv
 github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
 github.com/sagernet/sing v0.2.3 h1:V50MvZ4c3Iij2lYFWPlzL1PyipwSzjGeN9x+Ox89vpk=
 github.com/sagernet/sing v0.2.3/go.mod h1:Ta8nHnDLAwqySzKhGoKk4ZIB+vJ3GTKj7UPrWYvM+4w=
+github.com/sagernet/sing v0.2.4 h1:gC8BR5sglbJZX23RtMyFa8EETP9YEUADhfbEzU1yVbo=
+github.com/sagernet/sing v0.2.4/go.mod h1:Ta8nHnDLAwqySzKhGoKk4ZIB+vJ3GTKj7UPrWYvM+4w=
 github.com/sagernet/sing-shadowsocks v0.2.1 h1:FvdLQOqpvxHBJUcUe4fvgiYP2XLLwH5i1DtXQviVEPw=
 github.com/sagernet/sing-shadowsocks v0.2.1/go.mod h1:T/OgurSjsAe+Ug3+6PprXjmgHFmJidjOvQcjXGTKb3I=
 github.com/sagernet/wireguard-go v0.0.0-20221116151939-c99467f53f2c h1:vK2wyt9aWYHHvNLWniwijBu/n4pySypiKRhN32u/JGo=

+ 7 - 11
proxy/shadowsocks_2022/inbound.go

@@ -17,6 +17,7 @@ import (
 	"github.com/xtls/xray-core/common/net"
 	"github.com/xtls/xray-core/common/protocol"
 	"github.com/xtls/xray-core/common/session"
+	"github.com/xtls/xray-core/common/singbridge"
 	"github.com/xtls/xray-core/features/routing"
 	"github.com/xtls/xray-core/transport/internet/stat"
 )
@@ -74,7 +75,7 @@ func (i *Inbound) Process(ctx context.Context, network net.Network, connection s
 	ctx = session.ContextWithDispatcher(ctx, dispatcher)
 
 	if network == net.Network_TCP {
-		return returnError(i.service.NewConnection(ctx, connection, metadata))
+		return singbridge.ReturnError(i.service.NewConnection(ctx, connection, metadata))
 	} else {
 		reader := buf.NewReader(connection)
 		pc := &natPacketConn{connection}
@@ -82,7 +83,7 @@ func (i *Inbound) Process(ctx context.Context, network net.Network, connection s
 			mb, err := reader.ReadMultiBuffer()
 			if err != nil {
 				buf.ReleaseMulti(mb)
-				return returnError(err)
+				return singbridge.ReturnError(err)
 			}
 			for _, buffer := range mb {
 				packet := B.As(buffer.Bytes()).ToOwned()
@@ -112,16 +113,11 @@ func (i *Inbound) NewConnection(ctx context.Context, conn net.Conn, metadata M.M
 	})
 	newError("tunnelling request to tcp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx))
 	dispatcher := session.DispatcherFromContext(ctx)
-	link, err := dispatcher.Dispatch(ctx, toDestination(metadata.Destination, net.Network_TCP))
+	link, err := dispatcher.Dispatch(ctx, singbridge.ToDestination(metadata.Destination, net.Network_TCP))
 	if err != nil {
 		return err
 	}
-	outConn := &pipeConnWrapper{
-		&buf.BufferedReader{Reader: link.Reader},
-		link.Writer,
-		conn,
-	}
-	return bufio.CopyConn(ctx, conn, outConn)
+	return singbridge.CopyConn(ctx, nil, link, conn)
 }
 
 func (i *Inbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error {
@@ -138,12 +134,12 @@ func (i *Inbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, me
 	})
 	newError("tunnelling request to udp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx))
 	dispatcher := session.DispatcherFromContext(ctx)
-	destination := toDestination(metadata.Destination, net.Network_UDP)
+	destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP)
 	link, err := dispatcher.Dispatch(ctx, destination)
 	if err != nil {
 		return err
 	}
-	outConn := &packetConnWrapper{
+	outConn := &singbridge.PacketConnWrapper{
 		Reader: link.Reader,
 		Writer: link.Writer,
 		Dest:   destination,

+ 7 - 11
proxy/shadowsocks_2022/inbound_multi.go

@@ -21,6 +21,7 @@ import (
 	"github.com/xtls/xray-core/common/net"
 	"github.com/xtls/xray-core/common/protocol"
 	"github.com/xtls/xray-core/common/session"
+	"github.com/xtls/xray-core/common/singbridge"
 	"github.com/xtls/xray-core/common/uuid"
 	"github.com/xtls/xray-core/features/routing"
 	"github.com/xtls/xray-core/transport/internet/stat"
@@ -163,7 +164,7 @@ func (i *MultiUserInbound) Process(ctx context.Context, network net.Network, con
 	ctx = session.ContextWithDispatcher(ctx, dispatcher)
 
 	if network == net.Network_TCP {
-		return returnError(i.service.NewConnection(ctx, connection, metadata))
+		return singbridge.ReturnError(i.service.NewConnection(ctx, connection, metadata))
 	} else {
 		reader := buf.NewReader(connection)
 		pc := &natPacketConn{connection}
@@ -171,7 +172,7 @@ func (i *MultiUserInbound) Process(ctx context.Context, network net.Network, con
 			mb, err := reader.ReadMultiBuffer()
 			if err != nil {
 				buf.ReleaseMulti(mb)
-				return returnError(err)
+				return singbridge.ReturnError(err)
 			}
 			for _, buffer := range mb {
 				packet := B.As(buffer.Bytes()).ToOwned()
@@ -203,16 +204,11 @@ func (i *MultiUserInbound) NewConnection(ctx context.Context, conn net.Conn, met
 	})
 	newError("tunnelling request to tcp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx))
 	dispatcher := session.DispatcherFromContext(ctx)
-	link, err := dispatcher.Dispatch(ctx, toDestination(metadata.Destination, net.Network_TCP))
+	link, err := dispatcher.Dispatch(ctx, singbridge.ToDestination(metadata.Destination, net.Network_TCP))
 	if err != nil {
 		return err
 	}
-	outConn := &pipeConnWrapper{
-		&buf.BufferedReader{Reader: link.Reader},
-		link.Writer,
-		conn,
-	}
-	return bufio.CopyConn(ctx, conn, outConn)
+	return singbridge.CopyConn(ctx, conn, link, conn)
 }
 
 func (i *MultiUserInbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error {
@@ -231,12 +227,12 @@ func (i *MultiUserInbound) NewPacketConnection(ctx context.Context, conn N.Packe
 	})
 	newError("tunnelling request to udp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx))
 	dispatcher := session.DispatcherFromContext(ctx)
-	destination := toDestination(metadata.Destination, net.Network_UDP)
+	destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP)
 	link, err := dispatcher.Dispatch(ctx, destination)
 	if err != nil {
 		return err
 	}
-	outConn := &packetConnWrapper{
+	outConn := &singbridge.PacketConnWrapper{
 		Reader: link.Reader,
 		Writer: link.Writer,
 		Dest:   destination,

+ 8 - 12
proxy/shadowsocks_2022/inbound_relay.go

@@ -19,6 +19,7 @@ import (
 	"github.com/xtls/xray-core/common/net"
 	"github.com/xtls/xray-core/common/protocol"
 	"github.com/xtls/xray-core/common/session"
+	"github.com/xtls/xray-core/common/singbridge"
 	"github.com/xtls/xray-core/common/uuid"
 	"github.com/xtls/xray-core/features/routing"
 	"github.com/xtls/xray-core/transport/internet/stat"
@@ -66,7 +67,7 @@ func NewRelayServer(ctx context.Context, config *RelayServerConfig) (*RelayInbou
 		C.MapIndexed(config.Destinations, func(index int, it *RelayDestination) int { return index }),
 		C.Map(config.Destinations, func(it *RelayDestination) string { return it.Key }),
 		C.Map(config.Destinations, func(it *RelayDestination) M.Socksaddr {
-			return toSocksaddr(net.Destination{
+			return singbridge.ToSocksaddr(net.Destination{
 				Address: it.Address.AsAddress(),
 				Port:    net.Port(it.Port),
 			})
@@ -95,7 +96,7 @@ func (i *RelayInbound) Process(ctx context.Context, network net.Network, connect
 	ctx = session.ContextWithDispatcher(ctx, dispatcher)
 
 	if network == net.Network_TCP {
-		return returnError(i.service.NewConnection(ctx, connection, metadata))
+		return singbridge.ReturnError(i.service.NewConnection(ctx, connection, metadata))
 	} else {
 		reader := buf.NewReader(connection)
 		pc := &natPacketConn{connection}
@@ -103,7 +104,7 @@ func (i *RelayInbound) Process(ctx context.Context, network net.Network, connect
 			mb, err := reader.ReadMultiBuffer()
 			if err != nil {
 				buf.ReleaseMulti(mb)
-				return returnError(err)
+				return singbridge.ReturnError(err)
 			}
 			for _, buffer := range mb {
 				packet := B.As(buffer.Bytes()).ToOwned()
@@ -135,16 +136,11 @@ func (i *RelayInbound) NewConnection(ctx context.Context, conn net.Conn, metadat
 	})
 	newError("tunnelling request to tcp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx))
 	dispatcher := session.DispatcherFromContext(ctx)
-	link, err := dispatcher.Dispatch(ctx, toDestination(metadata.Destination, net.Network_TCP))
+	link, err := dispatcher.Dispatch(ctx, singbridge.ToDestination(metadata.Destination, net.Network_TCP))
 	if err != nil {
 		return err
 	}
-	outConn := &pipeConnWrapper{
-		&buf.BufferedReader{Reader: link.Reader},
-		link.Writer,
-		conn,
-	}
-	return bufio.CopyConn(ctx, conn, outConn)
+	return singbridge.CopyConn(ctx, nil, link, conn)
 }
 
 func (i *RelayInbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error {
@@ -163,12 +159,12 @@ func (i *RelayInbound) NewPacketConnection(ctx context.Context, conn N.PacketCon
 	})
 	newError("tunnelling request to udp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx))
 	dispatcher := session.DispatcherFromContext(ctx)
-	destination := toDestination(metadata.Destination, net.Network_UDP)
+	destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP)
 	link, err := dispatcher.Dispatch(ctx, destination)
 	if err != nil {
 		return err
 	}
-	outConn := &packetConnWrapper{
+	outConn := &singbridge.PacketConnWrapper{
 		Reader: link.Reader,
 		Writer: link.Writer,
 		Dest:   destination,

+ 7 - 17
proxy/shadowsocks_2022/outbound.go

@@ -2,7 +2,6 @@ package shadowsocks_2022
 
 import (
 	"context"
-	"io"
 	"runtime"
 	"time"
 
@@ -17,6 +16,7 @@ import (
 	"github.com/xtls/xray-core/common/buf"
 	"github.com/xtls/xray-core/common/net"
 	"github.com/xtls/xray-core/common/session"
+	"github.com/xtls/xray-core/common/singbridge"
 	"github.com/xtls/xray-core/transport"
 	"github.com/xtls/xray-core/transport/internet"
 )
@@ -93,7 +93,7 @@ func (o *Outbound) Process(ctx context.Context, link *transport.Link, dialer int
 	}
 
 	if network == net.Network_TCP {
-		serverConn := o.method.DialEarlyConn(connection, toSocksaddr(destination))
+		serverConn := o.method.DialEarlyConn(connection, singbridge.ToSocksaddr(destination))
 		var handshake bool
 		if timeoutReader, isTimeoutReader := link.Reader.(buf.TimeoutReader); isTimeoutReader {
 			mb, err := timeoutReader.ReadMultiBufferTimeout(time.Millisecond * 100)
@@ -128,17 +128,7 @@ func (o *Outbound) Process(ctx context.Context, link *transport.Link, dialer int
 				return newError("client handshake").Base(err)
 			}
 		}
-		conn := &pipeConnWrapper{
-			W:    link.Writer,
-			Conn: inboundConn,
-		}
-		if ir, ok := link.Reader.(io.Reader); ok {
-			conn.R = ir
-		} else {
-			conn.R = &buf.BufferedReader{Reader: link.Reader}
-		}
-
-		return returnError(bufio.CopyConn(ctx, conn, serverConn))
+		return singbridge.CopyConn(ctx, inboundConn, link, serverConn)
 	} else {
 		var packetConn N.PacketConn
 		if pc, isPacketConn := inboundConn.(N.PacketConn); isPacketConn {
@@ -146,7 +136,7 @@ func (o *Outbound) Process(ctx context.Context, link *transport.Link, dialer int
 		} else if nc, isNetPacket := inboundConn.(net.PacketConn); isNetPacket {
 			packetConn = bufio.NewPacketConn(nc)
 		} else {
-			packetConn = &packetConnWrapper{
+			packetConn = &singbridge.PacketConnWrapper{
 				Reader: link.Reader,
 				Writer: link.Writer,
 				Conn:   inboundConn,
@@ -155,14 +145,14 @@ func (o *Outbound) Process(ctx context.Context, link *transport.Link, dialer int
 		}
 
 		if o.uotClient != nil {
-			uConn, err := o.uotClient.DialEarlyConn(o.method.DialEarlyConn(connection, uot.RequestDestination(o.uotClient.Version)), false, toSocksaddr(destination))
+			uConn, err := o.uotClient.DialEarlyConn(o.method.DialEarlyConn(connection, uot.RequestDestination(o.uotClient.Version)), false, singbridge.ToSocksaddr(destination))
 			if err != nil {
 				return err
 			}
-			return returnError(bufio.CopyPacketConn(ctx, packetConn, uConn))
+			return singbridge.ReturnError(bufio.CopyPacketConn(ctx, packetConn, uConn))
 		} else {
 			serverConn := o.method.DialPacketConn(connection)
-			return returnError(bufio.CopyPacketConn(ctx, packetConn, serverConn))
+			return singbridge.ReturnError(bufio.CopyPacketConn(ctx, packetConn, serverConn))
 		}
 	}
 }

+ 0 - 142
proxy/shadowsocks_2022/shadowsocks_2022.go

@@ -1,145 +1,3 @@
 package shadowsocks_2022
 
-import (
-	"io"
-
-	B "github.com/sagernet/sing/common/buf"
-	E "github.com/sagernet/sing/common/exceptions"
-	M "github.com/sagernet/sing/common/metadata"
-	"github.com/xtls/xray-core/common/buf"
-	"github.com/xtls/xray-core/common/net"
-)
-
 //go:generate go run github.com/xtls/xray-core/common/errors/errorgen
-
-func toDestination(socksaddr M.Socksaddr, network net.Network) net.Destination {
-	if socksaddr.IsFqdn() {
-		return net.Destination{
-			Network: network,
-			Address: net.DomainAddress(socksaddr.Fqdn),
-			Port:    net.Port(socksaddr.Port),
-		}
-	} else {
-		return net.Destination{
-			Network: network,
-			Address: net.IPAddress(socksaddr.Addr.AsSlice()),
-			Port:    net.Port(socksaddr.Port),
-		}
-	}
-}
-
-func toSocksaddr(destination net.Destination) M.Socksaddr {
-	var addr M.Socksaddr
-	switch destination.Address.Family() {
-	case net.AddressFamilyDomain:
-		addr.Fqdn = destination.Address.Domain()
-	default:
-		addr.Addr = M.AddrFromIP(destination.Address.IP())
-	}
-	addr.Port = uint16(destination.Port)
-	return addr
-}
-
-type pipeConnWrapper struct {
-	R io.Reader
-	W buf.Writer
-	net.Conn
-}
-
-func (w *pipeConnWrapper) Close() error {
-	return nil
-}
-
-func (w *pipeConnWrapper) Read(b []byte) (n int, err error) {
-	return w.R.Read(b)
-}
-
-func (w *pipeConnWrapper) Write(p []byte) (n int, err error) {
-	n = len(p)
-	var mb buf.MultiBuffer
-	pLen := len(p)
-	for pLen > 0 {
-		buffer := buf.New()
-		if pLen > buf.Size {
-			_, err = buffer.Write(p[:buf.Size])
-			p = p[buf.Size:]
-		} else {
-			buffer.Write(p)
-		}
-		pLen -= int(buffer.Len())
-		mb = append(mb, buffer)
-	}
-	err = w.W.WriteMultiBuffer(mb)
-	if err != nil {
-		n = 0
-		buf.ReleaseMulti(mb)
-	}
-	return
-}
-
-type packetConnWrapper struct {
-	buf.Reader
-	buf.Writer
-	net.Conn
-	Dest   net.Destination
-	cached buf.MultiBuffer
-}
-
-func (w *packetConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) {
-	if w.cached != nil {
-		mb, bb := buf.SplitFirst(w.cached)
-		if bb == nil {
-			w.cached = nil
-		} else {
-			buffer.Write(bb.Bytes())
-			w.cached = mb
-			var destination net.Destination
-			if bb.UDP != nil {
-				destination = *bb.UDP
-			} else {
-				destination = w.Dest
-			}
-			bb.Release()
-			return toSocksaddr(destination), nil
-		}
-	}
-	mb, err := w.ReadMultiBuffer()
-	if err != nil {
-		return M.Socksaddr{}, err
-	}
-	nb, bb := buf.SplitFirst(mb)
-	if bb == nil {
-		return M.Socksaddr{}, nil
-	} else {
-		buffer.Write(bb.Bytes())
-		w.cached = nb
-		var destination net.Destination
-		if bb.UDP != nil {
-			destination = *bb.UDP
-		} else {
-			destination = w.Dest
-		}
-		bb.Release()
-		return toSocksaddr(destination), nil
-	}
-}
-
-func (w *packetConnWrapper) WritePacket(buffer *B.Buffer, destination M.Socksaddr) error {
-	vBuf := buf.New()
-	vBuf.Write(buffer.Bytes())
-	endpoint := toDestination(destination, net.Network_UDP)
-	vBuf.UDP = &endpoint
-	return w.Writer.WriteMultiBuffer(buf.MultiBuffer{vBuf})
-}
-
-func (w *packetConnWrapper) Close() error {
-	buf.ReleaseMulti(w.cached)
-	return nil
-}
-
-func returnError(err error) error {
-	if E.IsClosed(err) {
-		return nil
-	}
-	return err
-}

+ 8 - 8
transport/internet/system_dialer.go

@@ -5,6 +5,7 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/sagernet/sing/common/control"
 	"github.com/xtls/xray-core/common/net"
 	"github.com/xtls/xray-core/common/session"
 	"github.com/xtls/xray-core/features/dns"
@@ -18,7 +19,7 @@ type SystemDialer interface {
 }
 
 type DefaultSystemDialer struct {
-	controllers []controller
+	controllers []control.Func
 	dns         dns.Client
 	obm         outbound.Manager
 }
@@ -81,6 +82,11 @@ func (d *DefaultSystemDialer) Dial(ctx context.Context, src net.Address, dest ne
 
 	if sockopt != nil || len(d.controllers) > 0 {
 		dialer.Control = func(network, address string, c syscall.RawConn) error {
+			for _, ctl := range d.controllers {
+				if err := ctl(network, address, c); err != nil {
+					newError("failed to apply external controller").Base(err).WriteToLog(session.ExportIDToError(ctx))
+				}
+			}
 			return c.Control(func(fd uintptr) {
 				if sockopt != nil {
 					if err := applyOutboundSocketOptions(network, address, fd, sockopt); err != nil {
@@ -92,12 +98,6 @@ func (d *DefaultSystemDialer) Dial(ctx context.Context, src net.Address, dest ne
 						}
 					}
 				}
-
-				for _, ctl := range d.controllers {
-					if err := ctl(network, address, fd); err != nil {
-						newError("failed to apply external controller").Base(err).WriteToLog(session.ExportIDToError(ctx))
-					}
-				}
 			})
 		}
 	}
@@ -185,7 +185,7 @@ func UseAlternativeSystemDialer(dialer SystemDialer) {
 // It only works when effective dialer is the default dialer.
 //
 // xray:api:beta
-func RegisterDialerController(ctl func(network, address string, fd uintptr) error) error {
+func RegisterDialerController(ctl control.Func) error {
 	if ctl == nil {
 		return newError("nil listener controller")
 	}

+ 10 - 11
transport/internet/system_listener.go

@@ -10,21 +10,26 @@ import (
 	"time"
 
 	"github.com/pires/go-proxyproto"
+	"github.com/sagernet/sing/common/control"
 	"github.com/xtls/xray-core/common/net"
 	"github.com/xtls/xray-core/common/session"
 )
 
 var effectiveListener = DefaultListener{}
 
-type controller func(network, address string, fd uintptr) error
-
 type DefaultListener struct {
-	controllers []controller
+	controllers []control.Func
 }
 
-func getControlFunc(ctx context.Context, sockopt *SocketConfig, controllers []controller) func(network, address string, c syscall.RawConn) error {
+func getControlFunc(ctx context.Context, sockopt *SocketConfig, controllers []control.Func) func(network, address string, c syscall.RawConn) error {
 	return func(network, address string, c syscall.RawConn) error {
 		return c.Control(func(fd uintptr) {
+			for _, controller := range controllers {
+				if err := controller(network, address, c); err != nil {
+					newError("failed to apply external controller").Base(err).WriteToLog(session.ExportIDToError(ctx))
+				}
+			}
+
 			if sockopt != nil {
 				if err := applyInboundSocketOptions(network, fd, sockopt); err != nil {
 					newError("failed to apply socket options to incoming connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
@@ -32,12 +37,6 @@ func getControlFunc(ctx context.Context, sockopt *SocketConfig, controllers []co
 			}
 
 			setReusePort(fd)
-
-			for _, controller := range controllers {
-				if err := controller(network, address, fd); err != nil {
-					newError("failed to apply external controller").Base(err).WriteToLog(session.ExportIDToError(ctx))
-				}
-			}
 		})
 	}
 }
@@ -117,7 +116,7 @@ func (dl *DefaultListener) ListenPacket(ctx context.Context, addr net.Addr, sock
 // The controller can be used to operate on file descriptors before they are put into use.
 //
 // xray:api:beta
-func RegisterListenerController(controller func(network, address string, fd uintptr) error) error {
+func RegisterListenerController(controller control.Func) error {
 	if controller == nil {
 		return newError("nil listener controller")
 	}

+ 7 - 3
transport/internet/system_listener_test.go

@@ -3,8 +3,10 @@ package internet_test
 import (
 	"context"
 	"net"
+	"syscall"
 	"testing"
 
+	"github.com/sagernet/sing/common/control"
 	"github.com/xtls/xray-core/common"
 	"github.com/xtls/xray-core/transport/internet"
 )
@@ -12,9 +14,11 @@ import (
 func TestRegisterListenerController(t *testing.T) {
 	var gotFd uintptr
 
-	common.Must(internet.RegisterListenerController(func(network string, addr string, fd uintptr) error {
-		gotFd = fd
-		return nil
+	common.Must(internet.RegisterListenerController(func(network, address string, conn syscall.RawConn) error {
+		return control.Raw(conn, func(fd uintptr) error {
+			gotFd = fd
+			return nil
+		})
 	}))
 
 	conn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{