pipe.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package singbridge
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "time"
  7. "github.com/sagernet/sing/common/bufio"
  8. "github.com/xtls/xray-core/common"
  9. "github.com/xtls/xray-core/common/buf"
  10. "github.com/xtls/xray-core/transport"
  11. )
  12. func CopyConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, serverConn net.Conn) error {
  13. conn := &PipeConnWrapper{
  14. W: link.Writer,
  15. Conn: inboundConn,
  16. }
  17. if ir, ok := link.Reader.(io.Reader); ok {
  18. conn.R = ir
  19. } else {
  20. conn.R = &buf.BufferedReader{Reader: link.Reader}
  21. }
  22. return ReturnError(bufio.CopyConn(ctx, conn, serverConn))
  23. }
  24. type PipeConnWrapper struct {
  25. R io.Reader
  26. W buf.Writer
  27. net.Conn
  28. }
  29. func (w *PipeConnWrapper) Close() error {
  30. return nil
  31. }
  32. // This Read implemented a timeout to avoid goroutine leak.
  33. // as a temporarily solution
  34. func (w *PipeConnWrapper) Read(b []byte) (n int, err error) {
  35. type readResult struct {
  36. n int
  37. err error
  38. }
  39. c := make(chan readResult, 1)
  40. go func() {
  41. n, err := w.R.Read(b)
  42. c <- readResult{n: n, err: err}
  43. }()
  44. select {
  45. case result := <-c:
  46. return result.n, result.err
  47. case <-time.After(300 * time.Second):
  48. common.Close(w.R)
  49. common.Interrupt(w.R)
  50. return 0, buf.ErrReadTimeout
  51. }
  52. }
  53. func (w *PipeConnWrapper) Write(p []byte) (n int, err error) {
  54. n = len(p)
  55. var mb buf.MultiBuffer
  56. pLen := len(p)
  57. for pLen > 0 {
  58. buffer := buf.New()
  59. if pLen > buf.Size {
  60. _, err = buffer.Write(p[:buf.Size])
  61. p = p[buf.Size:]
  62. } else {
  63. buffer.Write(p)
  64. }
  65. pLen -= int(buffer.Len())
  66. mb = append(mb, buffer)
  67. }
  68. err = w.W.WriteMultiBuffer(mb)
  69. if err != nil {
  70. n = 0
  71. buf.ReleaseMulti(mb)
  72. }
  73. return
  74. }