listener_udp.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package listener
  2. import (
  3. "net"
  4. "os"
  5. "github.com/sagernet/sing/common/buf"
  6. "github.com/sagernet/sing/common/control"
  7. E "github.com/sagernet/sing/common/exceptions"
  8. M "github.com/sagernet/sing/common/metadata"
  9. N "github.com/sagernet/sing/common/network"
  10. )
  11. func (l *Listener) ListenUDP() (net.PacketConn, error) {
  12. bindAddr := M.SocksaddrFrom(l.listenOptions.Listen.Build(), l.listenOptions.ListenPort)
  13. var lc net.ListenConfig
  14. var udpFragment bool
  15. if l.listenOptions.UDPFragment != nil {
  16. udpFragment = *l.listenOptions.UDPFragment
  17. } else {
  18. udpFragment = l.listenOptions.UDPFragmentDefault
  19. }
  20. if !udpFragment {
  21. lc.Control = control.Append(lc.Control, control.DisableUDPFragment())
  22. }
  23. udpConn, err := lc.ListenPacket(l.ctx, M.NetworkFromNetAddr(N.NetworkUDP, bindAddr.Addr), bindAddr.String())
  24. if err != nil {
  25. return nil, err
  26. }
  27. l.udpConn = udpConn.(*net.UDPConn)
  28. l.udpAddr = bindAddr
  29. l.logger.Info("udp server started at ", udpConn.LocalAddr())
  30. return udpConn, err
  31. }
  32. func (l *Listener) UDPAddr() M.Socksaddr {
  33. return l.udpAddr
  34. }
  35. func (l *Listener) PacketWriter() N.PacketWriter {
  36. return (*packetWriter)(l)
  37. }
  38. func (l *Listener) loopUDPIn() {
  39. defer close(l.packetOutboundClosed)
  40. var buffer *buf.Buffer
  41. if !l.threadUnsafePacketWriter {
  42. buffer = buf.NewPacket()
  43. defer buffer.Release()
  44. buffer.IncRef()
  45. defer buffer.DecRef()
  46. }
  47. if l.oobPacketHandler != nil {
  48. oob := make([]byte, 1024)
  49. for {
  50. if l.threadUnsafePacketWriter {
  51. buffer = buf.NewPacket()
  52. } else {
  53. buffer.Reset()
  54. }
  55. n, oobN, _, addr, err := l.udpConn.ReadMsgUDPAddrPort(buffer.FreeBytes(), oob)
  56. if err != nil {
  57. if l.threadUnsafePacketWriter {
  58. buffer.Release()
  59. }
  60. if l.shutdown.Load() && E.IsClosed(err) {
  61. return
  62. }
  63. l.udpConn.Close()
  64. l.logger.Error("udp listener closed: ", err)
  65. return
  66. }
  67. buffer.Truncate(n)
  68. l.oobPacketHandler.NewPacketEx(buffer, oob[:oobN], M.SocksaddrFromNetIP(addr).Unwrap())
  69. }
  70. } else {
  71. for {
  72. if l.threadUnsafePacketWriter {
  73. buffer = buf.NewPacket()
  74. } else {
  75. buffer.Reset()
  76. }
  77. n, addr, err := l.udpConn.ReadFromUDPAddrPort(buffer.FreeBytes())
  78. if err != nil {
  79. if l.threadUnsafePacketWriter {
  80. buffer.Release()
  81. }
  82. if l.shutdown.Load() && E.IsClosed(err) {
  83. return
  84. }
  85. l.udpConn.Close()
  86. l.logger.Error("udp listener closed: ", err)
  87. return
  88. }
  89. buffer.Truncate(n)
  90. l.packetHandler.NewPacketEx(buffer, M.SocksaddrFromNetIP(addr).Unwrap())
  91. }
  92. }
  93. }
  94. func (l *Listener) loopUDPOut() {
  95. for {
  96. select {
  97. case packet := <-l.packetOutbound:
  98. destination := packet.Destination.AddrPort()
  99. _, err := l.udpConn.WriteToUDPAddrPort(packet.Buffer.Bytes(), destination)
  100. packet.Buffer.Release()
  101. N.PutPacketBuffer(packet)
  102. if err != nil {
  103. if l.shutdown.Load() && E.IsClosed(err) {
  104. return
  105. }
  106. l.udpConn.Close()
  107. l.logger.Error("udp listener write back: ", destination, ": ", err)
  108. return
  109. }
  110. continue
  111. case <-l.packetOutboundClosed:
  112. }
  113. for {
  114. select {
  115. case packet := <-l.packetOutbound:
  116. packet.Buffer.Release()
  117. N.PutPacketBuffer(packet)
  118. default:
  119. return
  120. }
  121. }
  122. }
  123. }
  124. type packetWriter Listener
  125. func (w *packetWriter) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
  126. packet := N.NewPacketBuffer()
  127. packet.Buffer = buffer
  128. packet.Destination = destination
  129. select {
  130. case w.packetOutbound <- packet:
  131. return nil
  132. default:
  133. buffer.Release()
  134. N.PutPacketBuffer(packet)
  135. if w.shutdown.Load() {
  136. return os.ErrClosed
  137. }
  138. w.logger.Trace("dropped packet to ", destination)
  139. return nil
  140. }
  141. }
  142. func (w *packetWriter) WriteIsThreadUnsafe() {
  143. }