|
@@ -26,16 +26,17 @@ import (
|
|
|
var _ adapter.Inbound = (*myInboundAdapter)(nil)
|
|
|
|
|
|
type myInboundAdapter struct {
|
|
|
- protocol string
|
|
|
- network []string
|
|
|
- ctx context.Context
|
|
|
- router adapter.Router
|
|
|
- logger log.ContextLogger
|
|
|
- tag string
|
|
|
- listenOptions option.ListenOptions
|
|
|
- connHandler adapter.ConnectionHandler
|
|
|
- packetHandler adapter.PacketHandler
|
|
|
- packetUpstream any
|
|
|
+ protocol string
|
|
|
+ network []string
|
|
|
+ ctx context.Context
|
|
|
+ router adapter.Router
|
|
|
+ logger log.ContextLogger
|
|
|
+ tag string
|
|
|
+ listenOptions option.ListenOptions
|
|
|
+ connHandler adapter.ConnectionHandler
|
|
|
+ packetHandler adapter.PacketHandler
|
|
|
+ oobPacketHandler adapter.OOBPacketHandler
|
|
|
+ packetUpstream any
|
|
|
|
|
|
// http mixed
|
|
|
|
|
@@ -85,12 +86,20 @@ func (a *myInboundAdapter) Start() error {
|
|
|
a.packetForce6 = M.SocksaddrFromNet(udpConn.LocalAddr()).Addr.Is6()
|
|
|
a.packetOutboundClosed = make(chan struct{})
|
|
|
a.packetOutbound = make(chan *myInboundPacket)
|
|
|
- if _, threadUnsafeHandler := common.Cast[N.ThreadUnsafeWriter](a.packetUpstream); !threadUnsafeHandler {
|
|
|
- go a.loopUDPIn()
|
|
|
+ if a.oobPacketHandler != nil {
|
|
|
+ if _, threadUnsafeHandler := common.Cast[N.ThreadUnsafeWriter](a.packetUpstream); !threadUnsafeHandler {
|
|
|
+ go a.loopUDPOOBIn()
|
|
|
+ } else {
|
|
|
+ go a.loopUDPOOBInThreadSafe()
|
|
|
+ }
|
|
|
} else {
|
|
|
- go a.loopUDPInThreadSafe()
|
|
|
+ if _, threadUnsafeHandler := common.Cast[N.ThreadUnsafeWriter](a.packetUpstream); !threadUnsafeHandler {
|
|
|
+ go a.loopUDPIn()
|
|
|
+ } else {
|
|
|
+ go a.loopUDPInThreadSafe()
|
|
|
+ }
|
|
|
+ go a.loopUDPOut()
|
|
|
}
|
|
|
- go a.loopUDPOut()
|
|
|
a.logger.Info("udp server started at ", udpConn.LocalAddr())
|
|
|
}
|
|
|
if a.setSystemProxy {
|
|
@@ -194,6 +203,37 @@ func (a *myInboundAdapter) loopUDPIn() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (a *myInboundAdapter) loopUDPOOBIn() {
|
|
|
+ defer close(a.packetOutboundClosed)
|
|
|
+ _buffer := buf.StackNewPacket()
|
|
|
+ defer common.KeepAlive(_buffer)
|
|
|
+ buffer := common.Dup(_buffer)
|
|
|
+ defer buffer.Release()
|
|
|
+ buffer.IncRef()
|
|
|
+ defer buffer.DecRef()
|
|
|
+ packetService := (*myInboundPacketAdapter)(a)
|
|
|
+ oob := make([]byte, 1024)
|
|
|
+ for {
|
|
|
+ buffer.Reset()
|
|
|
+ n, oobN, _, addr, err := a.udpConn.ReadMsgUDPAddrPort(buffer.FreeBytes(), oob)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ buffer.Truncate(n)
|
|
|
+ var metadata adapter.InboundContext
|
|
|
+ metadata.Inbound = a.tag
|
|
|
+ metadata.SniffEnabled = a.listenOptions.SniffEnabled
|
|
|
+ metadata.SniffOverrideDestination = a.listenOptions.SniffOverrideDestination
|
|
|
+ metadata.DomainStrategy = dns.DomainStrategy(a.listenOptions.DomainStrategy)
|
|
|
+ metadata.Network = C.NetworkUDP
|
|
|
+ metadata.Source = M.SocksaddrFromNetIP(addr)
|
|
|
+ err = a.oobPacketHandler.NewPacket(a.ctx, packetService, buffer, oob[:oobN], metadata)
|
|
|
+ if err != nil {
|
|
|
+ a.newError(E.Cause(err, "process packet from ", metadata.Source))
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (a *myInboundAdapter) loopUDPInThreadSafe() {
|
|
|
defer close(a.packetOutboundClosed)
|
|
|
packetService := (*myInboundPacketAdapter)(a)
|
|
@@ -220,6 +260,33 @@ func (a *myInboundAdapter) loopUDPInThreadSafe() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (a *myInboundAdapter) loopUDPOOBInThreadSafe() {
|
|
|
+ defer close(a.packetOutboundClosed)
|
|
|
+ packetService := (*myInboundPacketAdapter)(a)
|
|
|
+ oob := make([]byte, 1024)
|
|
|
+ for {
|
|
|
+ buffer := buf.NewPacket()
|
|
|
+ n, oobN, _, addr, err := a.udpConn.ReadMsgUDPAddrPort(buffer.FreeBytes(), oob)
|
|
|
+ if err != nil {
|
|
|
+ buffer.Release()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ buffer.Truncate(n)
|
|
|
+ var metadata adapter.InboundContext
|
|
|
+ metadata.Inbound = a.tag
|
|
|
+ metadata.SniffEnabled = a.listenOptions.SniffEnabled
|
|
|
+ metadata.SniffOverrideDestination = a.listenOptions.SniffOverrideDestination
|
|
|
+ metadata.DomainStrategy = dns.DomainStrategy(a.listenOptions.DomainStrategy)
|
|
|
+ metadata.Network = C.NetworkUDP
|
|
|
+ metadata.Source = M.SocksaddrFromNetIP(addr)
|
|
|
+ err = a.oobPacketHandler.NewPacket(a.ctx, packetService, buffer, oob[:oobN], metadata)
|
|
|
+ if err != nil {
|
|
|
+ buffer.Release()
|
|
|
+ a.newError(E.Cause(err, "process packet from ", metadata.Source))
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (a *myInboundAdapter) loopUDPOut() {
|
|
|
for {
|
|
|
select {
|