packet.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package singbridge
  2. import (
  3. "context"
  4. "time"
  5. B "github.com/sagernet/sing/common/buf"
  6. "github.com/sagernet/sing/common/bufio"
  7. M "github.com/sagernet/sing/common/metadata"
  8. "github.com/xtls/xray-core/common"
  9. "github.com/xtls/xray-core/common/buf"
  10. "github.com/xtls/xray-core/common/net"
  11. "github.com/xtls/xray-core/transport"
  12. )
  13. func CopyPacketConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, destination net.Destination, serverConn net.PacketConn) error {
  14. conn := &PacketConnWrapper{
  15. Reader: link.Reader,
  16. Writer: link.Writer,
  17. Dest: destination,
  18. Conn: inboundConn,
  19. }
  20. return ReturnError(bufio.CopyPacketConn(ctx, conn, bufio.NewPacketConn(serverConn)))
  21. }
  22. type PacketConnWrapper struct {
  23. buf.Reader
  24. buf.Writer
  25. net.Conn
  26. Dest net.Destination
  27. cached buf.MultiBuffer
  28. }
  29. // This ReadPacket implemented a timeout to avoid goroutine leak like PipeConnWrapper.Read()
  30. // as a temporarily solution
  31. func (w *PacketConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) {
  32. if w.cached != nil {
  33. mb, bb := buf.SplitFirst(w.cached)
  34. if bb == nil {
  35. w.cached = nil
  36. } else {
  37. buffer.Write(bb.Bytes())
  38. w.cached = mb
  39. var destination net.Destination
  40. if bb.UDP != nil {
  41. destination = *bb.UDP
  42. } else {
  43. destination = w.Dest
  44. }
  45. bb.Release()
  46. return ToSocksaddr(destination), nil
  47. }
  48. }
  49. // timeout
  50. type readResult struct {
  51. mb buf.MultiBuffer
  52. err error
  53. }
  54. c := make(chan readResult, 1)
  55. go func() {
  56. mb, err := w.ReadMultiBuffer()
  57. c <- readResult{mb: mb, err: err}
  58. }()
  59. var mb buf.MultiBuffer
  60. select {
  61. case <-time.After(60 * time.Second):
  62. common.Close(w.Reader)
  63. common.Interrupt(w.Reader)
  64. return M.Socksaddr{}, buf.ErrReadTimeout
  65. case result := <-c:
  66. if result.err != nil {
  67. return M.Socksaddr{}, result.err
  68. }
  69. mb = result.mb
  70. }
  71. nb, bb := buf.SplitFirst(mb)
  72. if bb == nil {
  73. return M.Socksaddr{}, nil
  74. } else {
  75. buffer.Write(bb.Bytes())
  76. w.cached = nb
  77. var destination net.Destination
  78. if bb.UDP != nil {
  79. destination = *bb.UDP
  80. } else {
  81. destination = w.Dest
  82. }
  83. bb.Release()
  84. return ToSocksaddr(destination), nil
  85. }
  86. }
  87. func (w *PacketConnWrapper) WritePacket(buffer *B.Buffer, destination M.Socksaddr) error {
  88. vBuf := buf.New()
  89. vBuf.Write(buffer.Bytes())
  90. endpoint := ToDestination(destination, net.Network_UDP)
  91. vBuf.UDP = &endpoint
  92. return w.Writer.WriteMultiBuffer(buf.MultiBuffer{vBuf})
  93. }
  94. func (w *PacketConnWrapper) Close() error {
  95. buf.ReleaseMulti(w.cached)
  96. return nil
  97. }