io.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package buf
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "os"
  7. "syscall"
  8. "time"
  9. "github.com/xtls/xray-core/common/errors"
  10. "github.com/xtls/xray-core/features/stats"
  11. "github.com/xtls/xray-core/transport/internet/stat"
  12. )
  13. // Reader extends io.Reader with MultiBuffer.
  14. type Reader interface {
  15. // ReadMultiBuffer reads content from underlying reader, and put it into a MultiBuffer.
  16. ReadMultiBuffer() (MultiBuffer, error)
  17. }
  18. // ErrReadTimeout is an error that happens with IO timeout.
  19. var ErrReadTimeout = errors.New("IO timeout")
  20. // TimeoutReader is a reader that returns error if Read() operation takes longer than the given timeout.
  21. type TimeoutReader interface {
  22. ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
  23. }
  24. // Writer extends io.Writer with MultiBuffer.
  25. type Writer interface {
  26. // WriteMultiBuffer writes a MultiBuffer into underlying writer.
  27. WriteMultiBuffer(MultiBuffer) error
  28. }
  29. // WriteAllBytes ensures all bytes are written into the given writer.
  30. func WriteAllBytes(writer io.Writer, payload []byte, c stats.Counter) error {
  31. wc := 0
  32. defer func() {
  33. if c != nil {
  34. c.Add(int64(wc))
  35. }
  36. }()
  37. for len(payload) > 0 {
  38. n, err := writer.Write(payload)
  39. wc += n
  40. if err != nil {
  41. return err
  42. }
  43. payload = payload[n:]
  44. }
  45. return nil
  46. }
  47. func isPacketReader(reader io.Reader) bool {
  48. _, ok := reader.(net.PacketConn)
  49. return ok
  50. }
  51. // NewReader creates a new Reader.
  52. // The Reader instance doesn't take the ownership of reader.
  53. func NewReader(reader io.Reader) Reader {
  54. if mr, ok := reader.(Reader); ok {
  55. return mr
  56. }
  57. if isPacketReader(reader) {
  58. return &PacketReader{
  59. Reader: reader,
  60. }
  61. }
  62. _, isFile := reader.(*os.File)
  63. if !isFile && useReadv {
  64. if sc, ok := reader.(syscall.Conn); ok {
  65. rawConn, err := sc.SyscallConn()
  66. if err != nil {
  67. errors.LogInfoInner(context.Background(), err, "failed to get sysconn")
  68. } else {
  69. var counter stats.Counter
  70. if statConn, ok := reader.(*stat.CounterConnection); ok {
  71. reader = statConn.Connection
  72. counter = statConn.ReadCounter
  73. }
  74. return NewReadVReader(reader, rawConn, counter)
  75. }
  76. }
  77. }
  78. return &SingleReader{
  79. Reader: reader,
  80. }
  81. }
  82. // NewPacketReader creates a new PacketReader based on the given reader.
  83. func NewPacketReader(reader io.Reader) Reader {
  84. if mr, ok := reader.(Reader); ok {
  85. return mr
  86. }
  87. return &PacketReader{
  88. Reader: reader,
  89. }
  90. }
  91. func isPacketWriter(writer io.Writer) bool {
  92. if _, ok := writer.(net.PacketConn); ok {
  93. return true
  94. }
  95. // If the writer doesn't implement syscall.Conn, it is probably not a TCP connection.
  96. if _, ok := writer.(syscall.Conn); !ok {
  97. return true
  98. }
  99. return false
  100. }
  101. // NewWriter creates a new Writer.
  102. func NewWriter(writer io.Writer) Writer {
  103. if mw, ok := writer.(Writer); ok {
  104. return mw
  105. }
  106. iConn := writer
  107. if statConn, ok := writer.(*stat.CounterConnection); ok {
  108. iConn = statConn.Connection
  109. }
  110. if isPacketWriter(iConn) {
  111. return &SequentialWriter{
  112. Writer: writer,
  113. }
  114. }
  115. var counter stats.Counter
  116. if statConn, ok := writer.(*stat.CounterConnection); ok {
  117. counter = statConn.WriteCounter
  118. }
  119. return &BufferToBytesWriter{
  120. Writer: iConn,
  121. counter: counter,
  122. }
  123. }