lz4stream.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package protocol
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "io"
  7. "sync"
  8. lz4 "github.com/bkaradzic/go-lz4"
  9. )
  10. const lz4Magic = 0x5e63b278
  11. type lz4Writer struct {
  12. wr io.Writer
  13. mut sync.Mutex
  14. buf []byte
  15. }
  16. func newLZ4Writer(w io.Writer) *lz4Writer {
  17. return &lz4Writer{wr: w}
  18. }
  19. func (w *lz4Writer) Write(bs []byte) (int, error) {
  20. w.mut.Lock()
  21. defer w.mut.Unlock()
  22. var err error
  23. w.buf, err = lz4.Encode(w.buf[:cap(w.buf)], bs)
  24. if err != nil {
  25. return 0, err
  26. }
  27. var hdr [8]byte
  28. binary.BigEndian.PutUint32(hdr[0:], lz4Magic)
  29. binary.BigEndian.PutUint32(hdr[4:], uint32(len(w.buf)))
  30. _, err = w.wr.Write(hdr[:])
  31. if err != nil {
  32. return 0, err
  33. }
  34. _, err = w.wr.Write(w.buf)
  35. if err != nil {
  36. return 0, err
  37. }
  38. if debug {
  39. l.Debugf("lz4 write; %d / %d bytes", len(bs), 8+len(w.buf))
  40. }
  41. return len(bs), nil
  42. }
  43. type lz4Reader struct {
  44. rd io.Reader
  45. mut sync.Mutex
  46. buf []byte
  47. ebuf []byte
  48. obuf *bytes.Buffer
  49. ibytes uint64
  50. obytes uint64
  51. }
  52. func newLZ4Reader(r io.Reader) *lz4Reader {
  53. return &lz4Reader{rd: r}
  54. }
  55. func (r *lz4Reader) Read(bs []byte) (int, error) {
  56. r.mut.Lock()
  57. defer r.mut.Unlock()
  58. if r.obuf == nil {
  59. r.obuf = bytes.NewBuffer(nil)
  60. }
  61. if r.obuf.Len() == 0 {
  62. if err := r.moreBits(); err != nil {
  63. return 0, err
  64. }
  65. }
  66. n, err := r.obuf.Read(bs)
  67. if debug {
  68. l.Debugf("lz4 read; %d bytes", n)
  69. }
  70. return n, err
  71. }
  72. func (r *lz4Reader) moreBits() error {
  73. var hdr [8]byte
  74. _, err := io.ReadFull(r.rd, hdr[:])
  75. if binary.BigEndian.Uint32(hdr[0:]) != lz4Magic {
  76. return errors.New("bad magic")
  77. }
  78. ln := int(binary.BigEndian.Uint32(hdr[4:]))
  79. if len(r.buf) < ln {
  80. r.buf = make([]byte, int(ln))
  81. } else {
  82. r.buf = r.buf[:ln]
  83. }
  84. _, err = io.ReadFull(r.rd, r.buf)
  85. if err != nil {
  86. return err
  87. }
  88. r.ebuf, err = lz4.Decode(r.ebuf[:cap(r.ebuf)], r.buf)
  89. if err != nil {
  90. return err
  91. }
  92. if debug {
  93. l.Debugf("lz4 moreBits: %d / %d bytes", ln+8, len(r.ebuf))
  94. }
  95. _, err = r.obuf.Write(r.ebuf)
  96. return err
  97. }