|
@@ -23,7 +23,7 @@ import (
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
- lz4 "github.com/bkaradzic/go-lz4"
|
|
|
+ lz4 "github.com/pierrec/lz4/v4"
|
|
|
"github.com/pkg/errors"
|
|
|
)
|
|
|
|
|
@@ -63,6 +63,10 @@ var sha256OfEmptyBlock = map[int][sha256.Size]byte{
|
|
|
16 << MiB: {0x8, 0xa, 0xcf, 0x35, 0xa5, 0x7, 0xac, 0x98, 0x49, 0xcf, 0xcb, 0xa4, 0x7d, 0xc2, 0xad, 0x83, 0xe0, 0x1b, 0x75, 0x66, 0x3a, 0x51, 0x62, 0x79, 0xc8, 0xb9, 0xd2, 0x43, 0xb7, 0x19, 0x64, 0x3e},
|
|
|
}
|
|
|
|
|
|
+var (
|
|
|
+ errNotCompressible = errors.New("not compressible")
|
|
|
+)
|
|
|
+
|
|
|
func init() {
|
|
|
for blockSize := MinBlockSize; blockSize <= MaxBlockSize; blockSize *= 2 {
|
|
|
BlockSizes = append(BlockSizes, blockSize)
|
|
@@ -800,7 +804,7 @@ func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte, ov
|
|
|
}
|
|
|
|
|
|
cOverhead := 2 + hdrSize + 4
|
|
|
- maxCompressed := cOverhead + lz4.CompressBound(len(marshaled))
|
|
|
+ maxCompressed := cOverhead + lz4.CompressBlockBound(len(marshaled))
|
|
|
buf := BufferPool.Get(maxCompressed)
|
|
|
defer BufferPool.Put(buf)
|
|
|
|
|
@@ -1010,32 +1014,30 @@ func (c *rawConnection) Statistics() Statistics {
|
|
|
}
|
|
|
|
|
|
func lz4Compress(src, buf []byte) (int, error) {
|
|
|
- compressed, err := lz4.Encode(buf, src)
|
|
|
+ // The compressed block is prefixed by the size of the uncompressed data.
|
|
|
+ binary.BigEndian.PutUint32(buf, uint32(len(src)))
|
|
|
+
|
|
|
+ n, err := lz4.CompressBlock(src, buf[4:], nil)
|
|
|
if err != nil {
|
|
|
return -1, err
|
|
|
- }
|
|
|
- if &compressed[0] != &buf[0] {
|
|
|
- panic("bug: lz4.Compress allocated, which it must not (should use buffer pool)")
|
|
|
+ } else if len(src) > 0 && n == 0 {
|
|
|
+ return -1, errNotCompressible
|
|
|
}
|
|
|
|
|
|
- binary.BigEndian.PutUint32(compressed, binary.LittleEndian.Uint32(compressed))
|
|
|
- return len(compressed), nil
|
|
|
+ return n + 4, nil
|
|
|
}
|
|
|
|
|
|
func lz4Decompress(src []byte) ([]byte, error) {
|
|
|
size := binary.BigEndian.Uint32(src)
|
|
|
- binary.LittleEndian.PutUint32(src, size)
|
|
|
- var err error
|
|
|
buf := BufferPool.Get(int(size))
|
|
|
- decoded, err := lz4.Decode(buf, src)
|
|
|
+
|
|
|
+ n, err := lz4.UncompressBlock(src[4:], buf)
|
|
|
if err != nil {
|
|
|
BufferPool.Put(buf)
|
|
|
return nil, err
|
|
|
}
|
|
|
- if &decoded[0] != &buf[0] {
|
|
|
- panic("bug: lz4.Decode allocated, which it must not (should use buffer pool)")
|
|
|
- }
|
|
|
- return decoded, nil
|
|
|
+
|
|
|
+ return buf[:n], nil
|
|
|
}
|
|
|
|
|
|
func newProtocolError(err error, msgContext string) error {
|