listener_udp.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package listener
  2. import (
  3. "net"
  4. "net/netip"
  5. "os"
  6. "github.com/sagernet/sing/common/buf"
  7. "github.com/sagernet/sing/common/control"
  8. E "github.com/sagernet/sing/common/exceptions"
  9. M "github.com/sagernet/sing/common/metadata"
  10. N "github.com/sagernet/sing/common/network"
  11. )
  12. func (l *Listener) ListenUDP() (net.PacketConn, error) {
  13. bindAddr := M.SocksaddrFrom(l.listenOptions.Listen.Build(netip.AddrFrom4([4]byte{127, 0, 0, 1})), l.listenOptions.ListenPort)
  14. var lc net.ListenConfig
  15. var udpFragment bool
  16. if l.listenOptions.UDPFragment != nil {
  17. udpFragment = *l.listenOptions.UDPFragment
  18. } else {
  19. udpFragment = l.listenOptions.UDPFragmentDefault
  20. }
  21. if !udpFragment {
  22. lc.Control = control.Append(lc.Control, control.DisableUDPFragment())
  23. }
  24. udpConn, err := lc.ListenPacket(l.ctx, M.NetworkFromNetAddr(N.NetworkUDP, bindAddr.Addr), bindAddr.String())
  25. if err != nil {
  26. return nil, err
  27. }
  28. l.udpConn = udpConn.(*net.UDPConn)
  29. l.udpAddr = bindAddr
  30. l.logger.Info("udp server started at ", udpConn.LocalAddr())
  31. return udpConn, err
  32. }
  33. func (l *Listener) UDPAddr() M.Socksaddr {
  34. return l.udpAddr
  35. }
  36. func (l *Listener) PacketWriter() N.PacketWriter {
  37. return (*packetWriter)(l)
  38. }
  39. func (l *Listener) loopUDPIn() {
  40. defer close(l.packetOutboundClosed)
  41. var buffer *buf.Buffer
  42. if !l.threadUnsafePacketWriter {
  43. buffer = buf.NewPacket()
  44. defer buffer.Release()
  45. buffer.IncRef()
  46. defer buffer.DecRef()
  47. }
  48. if l.oobPacketHandler != nil {
  49. oob := make([]byte, 1024)
  50. for {
  51. if l.threadUnsafePacketWriter {
  52. buffer = buf.NewPacket()
  53. } else {
  54. buffer.Reset()
  55. }
  56. n, oobN, _, addr, err := l.udpConn.ReadMsgUDPAddrPort(buffer.FreeBytes(), oob)
  57. if err != nil {
  58. if l.threadUnsafePacketWriter {
  59. buffer.Release()
  60. }
  61. if l.shutdown.Load() && E.IsClosed(err) {
  62. return
  63. }
  64. l.udpConn.Close()
  65. l.logger.Error("udp listener closed: ", err)
  66. return
  67. }
  68. buffer.Truncate(n)
  69. l.oobPacketHandler.NewPacketEx(buffer, oob[:oobN], M.SocksaddrFromNetIP(addr).Unwrap())
  70. }
  71. } else {
  72. for {
  73. if l.threadUnsafePacketWriter {
  74. buffer = buf.NewPacket()
  75. } else {
  76. buffer.Reset()
  77. }
  78. n, addr, err := l.udpConn.ReadFromUDPAddrPort(buffer.FreeBytes())
  79. if err != nil {
  80. if l.threadUnsafePacketWriter {
  81. buffer.Release()
  82. }
  83. if l.shutdown.Load() && E.IsClosed(err) {
  84. return
  85. }
  86. l.udpConn.Close()
  87. l.logger.Error("udp listener closed: ", err)
  88. return
  89. }
  90. buffer.Truncate(n)
  91. l.packetHandler.NewPacketEx(buffer, M.SocksaddrFromNetIP(addr).Unwrap())
  92. }
  93. }
  94. }
  95. func (l *Listener) loopUDPOut() {
  96. for {
  97. select {
  98. case packet := <-l.packetOutbound:
  99. destination := packet.Destination.AddrPort()
  100. _, err := l.udpConn.WriteToUDPAddrPort(packet.Buffer.Bytes(), destination)
  101. packet.Buffer.Release()
  102. N.PutPacketBuffer(packet)
  103. if err != nil {
  104. if l.shutdown.Load() && E.IsClosed(err) {
  105. return
  106. }
  107. l.udpConn.Close()
  108. l.logger.Error("udp listener write back: ", destination, ": ", err)
  109. return
  110. }
  111. continue
  112. case <-l.packetOutboundClosed:
  113. }
  114. for {
  115. select {
  116. case packet := <-l.packetOutbound:
  117. packet.Buffer.Release()
  118. N.PutPacketBuffer(packet)
  119. default:
  120. return
  121. }
  122. }
  123. }
  124. }
  125. type packetWriter Listener
  126. func (w *packetWriter) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
  127. packet := N.NewPacketBuffer()
  128. packet.Buffer = buffer
  129. packet.Destination = destination
  130. select {
  131. case w.packetOutbound <- packet:
  132. return nil
  133. default:
  134. buffer.Release()
  135. N.PutPacketBuffer(packet)
  136. if w.shutdown.Load() {
  137. return os.ErrClosed
  138. }
  139. w.logger.Trace("dropped packet to ", destination)
  140. return nil
  141. }
  142. }
  143. func (w *packetWriter) WriteIsThreadUnsafe() {
  144. }