reader.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package mux
  2. import (
  3. "io"
  4. "github.com/xtls/xray-core/common/buf"
  5. "github.com/xtls/xray-core/common/crypto"
  6. "github.com/xtls/xray-core/common/errors"
  7. "github.com/xtls/xray-core/common/net"
  8. "github.com/xtls/xray-core/common/serial"
  9. )
  10. // PacketReader is an io.Reader that reads whole chunk of Mux frames every time.
  11. type PacketReader struct {
  12. reader io.Reader
  13. eof bool
  14. dest *net.Destination
  15. }
  16. // NewPacketReader creates a new PacketReader.
  17. func NewPacketReader(reader io.Reader, dest *net.Destination) *PacketReader {
  18. return &PacketReader{
  19. reader: reader,
  20. eof: false,
  21. dest: dest,
  22. }
  23. }
  24. // ReadMultiBuffer implements buf.Reader.
  25. func (r *PacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
  26. if r.eof {
  27. return nil, io.EOF
  28. }
  29. size, err := serial.ReadUint16(r.reader)
  30. if err != nil {
  31. return nil, err
  32. }
  33. if size > buf.Size {
  34. return nil, errors.New("packet size too large: ", size)
  35. }
  36. b := buf.New()
  37. if _, err := b.ReadFullFrom(r.reader, int32(size)); err != nil {
  38. b.Release()
  39. return nil, err
  40. }
  41. r.eof = true
  42. if r.dest != nil && r.dest.Network == net.Network_UDP {
  43. b.UDP = r.dest
  44. }
  45. return buf.MultiBuffer{b}, nil
  46. }
  47. // NewStreamReader creates a new StreamReader.
  48. func NewStreamReader(reader *buf.BufferedReader) buf.Reader {
  49. return crypto.NewChunkStreamReaderWithChunkCount(crypto.PlainChunkSizeParser{}, reader, 1)
  50. }