listener_udp.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package listener
  2. import (
  3. "context"
  4. "net"
  5. "net/netip"
  6. "os"
  7. "github.com/sagernet/sing/common/buf"
  8. "github.com/sagernet/sing/common/control"
  9. E "github.com/sagernet/sing/common/exceptions"
  10. M "github.com/sagernet/sing/common/metadata"
  11. N "github.com/sagernet/sing/common/network"
  12. )
  13. func (l *Listener) ListenUDP() (net.PacketConn, error) {
  14. bindAddr := M.SocksaddrFrom(l.listenOptions.Listen.Build(netip.AddrFrom4([4]byte{127, 0, 0, 1})), l.listenOptions.ListenPort)
  15. var lc net.ListenConfig
  16. var udpFragment bool
  17. if l.listenOptions.UDPFragment != nil {
  18. udpFragment = *l.listenOptions.UDPFragment
  19. } else {
  20. udpFragment = l.listenOptions.UDPFragmentDefault
  21. }
  22. if !udpFragment {
  23. lc.Control = control.Append(lc.Control, control.DisableUDPFragment())
  24. }
  25. udpConn, err := ListenNetworkNamespace[net.PacketConn](l.listenOptions.NetNs, func() (net.PacketConn, error) {
  26. return lc.ListenPacket(l.ctx, M.NetworkFromNetAddr(N.NetworkUDP, bindAddr.Addr), bindAddr.String())
  27. })
  28. if err != nil {
  29. return nil, err
  30. }
  31. l.udpConn = udpConn.(*net.UDPConn)
  32. l.udpAddr = bindAddr
  33. l.logger.Info("udp server started at ", udpConn.LocalAddr())
  34. return udpConn, err
  35. }
  36. func (l *Listener) DialContext(dialer net.Dialer, ctx context.Context, network string, address string) (net.Conn, error) {
  37. return ListenNetworkNamespace[net.Conn](l.listenOptions.NetNs, func() (net.Conn, error) {
  38. return dialer.DialContext(ctx, network, address)
  39. })
  40. }
  41. func (l *Listener) ListenPacket(listenConfig net.ListenConfig, ctx context.Context, network string, address string) (net.PacketConn, error) {
  42. return ListenNetworkNamespace[net.PacketConn](l.listenOptions.NetNs, func() (net.PacketConn, error) {
  43. return listenConfig.ListenPacket(ctx, network, address)
  44. })
  45. }
  46. func (l *Listener) UDPAddr() M.Socksaddr {
  47. return l.udpAddr
  48. }
  49. func (l *Listener) PacketWriter() N.PacketWriter {
  50. return (*packetWriter)(l)
  51. }
  52. func (l *Listener) loopUDPIn() {
  53. defer close(l.packetOutboundClosed)
  54. var buffer *buf.Buffer
  55. if !l.threadUnsafePacketWriter {
  56. buffer = buf.NewPacket()
  57. defer buffer.Release()
  58. buffer.IncRef()
  59. defer buffer.DecRef()
  60. }
  61. if l.oobPacketHandler != nil {
  62. oob := make([]byte, 1024)
  63. for {
  64. if l.threadUnsafePacketWriter {
  65. buffer = buf.NewPacket()
  66. } else {
  67. buffer.Reset()
  68. }
  69. n, oobN, _, addr, err := l.udpConn.ReadMsgUDPAddrPort(buffer.FreeBytes(), oob)
  70. if err != nil {
  71. if l.threadUnsafePacketWriter {
  72. buffer.Release()
  73. }
  74. if l.shutdown.Load() && E.IsClosed(err) {
  75. return
  76. }
  77. l.udpConn.Close()
  78. l.logger.Error("udp listener closed: ", err)
  79. return
  80. }
  81. buffer.Truncate(n)
  82. l.oobPacketHandler.NewPacketEx(buffer, oob[:oobN], M.SocksaddrFromNetIP(addr).Unwrap())
  83. }
  84. } else {
  85. for {
  86. if l.threadUnsafePacketWriter {
  87. buffer = buf.NewPacket()
  88. } else {
  89. buffer.Reset()
  90. }
  91. n, addr, err := l.udpConn.ReadFromUDPAddrPort(buffer.FreeBytes())
  92. if err != nil {
  93. if l.threadUnsafePacketWriter {
  94. buffer.Release()
  95. }
  96. if l.shutdown.Load() && E.IsClosed(err) {
  97. return
  98. }
  99. l.udpConn.Close()
  100. l.logger.Error("udp listener closed: ", err)
  101. return
  102. }
  103. buffer.Truncate(n)
  104. l.packetHandler.NewPacketEx(buffer, M.SocksaddrFromNetIP(addr).Unwrap())
  105. }
  106. }
  107. }
  108. func (l *Listener) loopUDPOut() {
  109. for {
  110. select {
  111. case packet := <-l.packetOutbound:
  112. destination := packet.Destination.AddrPort()
  113. _, err := l.udpConn.WriteToUDPAddrPort(packet.Buffer.Bytes(), destination)
  114. packet.Buffer.Release()
  115. N.PutPacketBuffer(packet)
  116. if err != nil {
  117. if l.shutdown.Load() && E.IsClosed(err) {
  118. return
  119. }
  120. l.udpConn.Close()
  121. l.logger.Error("udp listener write back: ", destination, ": ", err)
  122. return
  123. }
  124. continue
  125. case <-l.packetOutboundClosed:
  126. }
  127. for {
  128. select {
  129. case packet := <-l.packetOutbound:
  130. packet.Buffer.Release()
  131. N.PutPacketBuffer(packet)
  132. default:
  133. return
  134. }
  135. }
  136. }
  137. }
  138. type packetWriter Listener
  139. func (w *packetWriter) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
  140. packet := N.NewPacketBuffer()
  141. packet.Buffer = buffer
  142. packet.Destination = destination
  143. select {
  144. case w.packetOutbound <- packet:
  145. return nil
  146. default:
  147. buffer.Release()
  148. N.PutPacketBuffer(packet)
  149. if w.shutdown.Load() {
  150. return os.ErrClosed
  151. }
  152. w.logger.Trace("dropped packet to ", destination)
  153. return nil
  154. }
  155. }
  156. func (w *packetWriter) WriteIsThreadUnsafe() {
  157. }