Browse Source

Refactor compression support, now at message level.

Jakob Borg 11 years ago
parent
commit
6c5c14f35f
8 changed files with 415 additions and 246 deletions
  1. 28 41
      protocol/PROTOCOL.md
  2. 15 2
      protocol/counting.go
  3. 14 7
      protocol/header.go
  4. 6 0
      protocol/message.go
  5. 107 0
      protocol/message_xdr.go
  6. 213 146
      protocol/protocol.go
  7. 23 33
      xdr/reader.go
  8. 9 17
      xdr/writer.go

+ 28 - 41
protocol/PROTOCOL.md

@@ -25,13 +25,11 @@ Transport and Authentication
 ----------------------------
 
 BEP is deployed as the highest level in a protocol stack, with the lower
-level protocols providing compression, encryption and authentication.
+level protocols providing encryption and authentication.
 
     +-----------------------------|
     |   Block Exchange Protocol   |
     |-----------------------------|
-    |      Compression (LZ4)      |
-    |-----------------------------|
     | Encryption & Auth (TLS 1.2) |
     |-----------------------------|
     |             TCP             |
@@ -62,48 +60,19 @@ requests are received.
 
 The underlying transport protocol MUST be TCP.
 
-Compression
------------
-
-All data is sent within compressed blocks. Blocks are compressed using
-the LZ4 format and algorithm described in
-https://code.google.com/p/lz4/. Each compressed block is preceded by a
-header consisting of three 32 bit words, in network order (big endian):
-
-     0                   1                   2                   3
-     0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-    |                     Magic (0x0x5e63b278)                      |
-    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-    |                          Data Length                          |
-    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-    |                   Uncompressed Block Length                   |
-    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-    /                                                               /
-    \                        Compressed Data                        \
-    /                                                               /
-    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-
-The Data Length indicates the length of data following the Data Length
-field until the next header, i.e. the length of the Compressed Data
-section plus four bytes for the Uncompressed Block Length field. The
-Uncompressed Block Length indicates the amount of data that will result
-when decompressing the Compressed Data section.
-
-A single BEP message SHOULD be sent as a single compressed block. A
-single compressed block MAY NOT contain more than one BEP message.
-
 Messages
 --------
 
 Every message starts with one 32 bit word indicating the message
-version, type and ID. The header is in network byte order, i.e. big
-endian.
+version, type and ID, followed by the length of the message. The header
+is in network byte order, i.e. big endian.
 
      0                   1                   2                   3
      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-    |  Ver  |       Message ID      |      Type     |    Reserved   |
+    |  Ver  |       Message ID      |      Type     |   Reserved  |C|
+    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+    |                            Length                             |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 
 For BEP v1 the Version field is set to zero. Future versions with
@@ -125,10 +94,28 @@ The Type field indicates the type of data following the message header
 and is one of the integers defined below. A message of an unknown type
 is a protocol error and MUST result in the connection being terminated.
 
-All data following the message header MUST be in XDR (RFC 1014)
-encoding. All fields shorter than 32 bits and all variable length data
-MUST be padded to a multiple of 32 bits. The actual data types in use by
-BEP, in XDR naming convention, are the following:
+The Compression bit "C" indicates the compression used for the message.
+
+For C=1:
+
+  * The Length field contains the length, in bytes, of the
+    compressed message data.
+
+  * The message data is compressed using the LZ4 format and algorithm
+    described in https://code.google.com/p/lz4/.
+
+For C=0:
+
+  * The Length field contains the length, in bytes, of the
+    uncompressed message data.
+
+  * The message is not compressed.
+
+All data within the the message (post decompression, if compression is
+in use) MUST be in XDR (RFC 1014) encoding. All fields shorter than 32
+bits and all variable length data MUST be padded to a multiple of 32
+bits. The actual data types in use by BEP, in XDR naming convention, are
+the following:
 
  - (unsigned) int   -- (unsigned) 32 bit integer
  - (unsigned) hyper -- (unsigned) 64 bit integer

+ 15 - 2
protocol/counting.go

@@ -7,11 +7,13 @@ package protocol
 import (
 	"io"
 	"sync/atomic"
+	"time"
 )
 
 type countingReader struct {
 	io.Reader
-	tot uint64
+	tot  uint64 // bytes
+	last int64  // unix nanos
 }
 
 var (
@@ -23,6 +25,7 @@ func (c *countingReader) Read(bs []byte) (int, error) {
 	n, err := c.Reader.Read(bs)
 	atomic.AddUint64(&c.tot, uint64(n))
 	atomic.AddUint64(&totalIncoming, uint64(n))
+	atomic.StoreInt64(&c.last, time.Now().UnixNano())
 	return n, err
 }
 
@@ -30,15 +33,21 @@ func (c *countingReader) Tot() uint64 {
 	return atomic.LoadUint64(&c.tot)
 }
 
+func (c *countingReader) Last() time.Time {
+	return time.Unix(0, atomic.LoadInt64(&c.last))
+}
+
 type countingWriter struct {
 	io.Writer
-	tot uint64
+	tot  uint64 // bytes
+	last int64  // unix nanos
 }
 
 func (c *countingWriter) Write(bs []byte) (int, error) {
 	n, err := c.Writer.Write(bs)
 	atomic.AddUint64(&c.tot, uint64(n))
 	atomic.AddUint64(&totalOutgoing, uint64(n))
+	atomic.StoreInt64(&c.last, time.Now().UnixNano())
 	return n, err
 }
 
@@ -46,6 +55,10 @@ func (c *countingWriter) Tot() uint64 {
 	return atomic.LoadUint64(&c.tot)
 }
 
+func (c *countingWriter) Last() time.Time {
+	return time.Unix(0, atomic.LoadInt64(&c.last))
+}
+
 func TotalInOut() (uint64, uint64) {
 	return atomic.LoadUint64(&totalIncoming), atomic.LoadUint64(&totalOutgoing)
 }

+ 14 - 7
protocol/header.go

@@ -7,9 +7,10 @@ package protocol
 import "github.com/calmh/syncthing/xdr"
 
 type header struct {
-	version int
-	msgID   int
-	msgType int
+	version     int
+	msgID       int
+	msgType     int
+	compression bool
 }
 
 func (h header) encodeXDR(xw *xdr.Writer) (int, error) {
@@ -24,15 +25,21 @@ func (h *header) decodeXDR(xr *xdr.Reader) error {
 }
 
 func encodeHeader(h header) uint32 {
+	var isComp uint32
+	if h.compression {
+		isComp = 1 << 0 // the zeroth bit is the compression bit
+	}
 	return uint32(h.version&0xf)<<28 +
 		uint32(h.msgID&0xfff)<<16 +
-		uint32(h.msgType&0xff)<<8
+		uint32(h.msgType&0xff)<<8 +
+		isComp
 }
 
 func decodeHeader(u uint32) header {
 	return header{
-		version: int(u>>28) & 0xf,
-		msgID:   int(u>>16) & 0xfff,
-		msgType: int(u>>8) & 0xff,
+		version:     int(u>>28) & 0xf,
+		msgID:       int(u>>16) & 0xfff,
+		msgType:     int(u>>8) & 0xff,
+		compression: u&1 == 1,
 	}
 }

+ 6 - 0
protocol/message.go

@@ -49,6 +49,10 @@ type RequestMessage struct {
 	Size       uint32
 }
 
+type ResponseMessage struct {
+	Data []byte
+}
+
 type ClusterConfigMessage struct {
 	ClientName    string       // max:64
 	ClientVersion string       // max:64
@@ -75,3 +79,5 @@ type Option struct {
 type CloseMessage struct {
 	Reason string // max:1024
 }
+
+type EmptyMessage struct{}

+ 107 - 0
protocol/message_xdr.go

@@ -348,6 +348,64 @@ func (o *RequestMessage) decodeXDR(xr *xdr.Reader) error {
 
 /*
 
+ResponseMessage Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                        Length of Data                         |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                    Data (variable length)                     \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct ResponseMessage {
+	opaque Data<>;
+}
+
+*/
+
+func (o ResponseMessage) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o ResponseMessage) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o ResponseMessage) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o ResponseMessage) encodeXDR(xw *xdr.Writer) (int, error) {
+	xw.WriteBytes(o.Data)
+	return xw.Tot(), xw.Error()
+}
+
+func (o *ResponseMessage) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *ResponseMessage) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *ResponseMessage) decodeXDR(xr *xdr.Reader) error {
+	o.Data = xr.ReadBytes()
+	return xr.Error()
+}
+
+/*
+
 ClusterConfigMessage Structure:
 
  0                   1                   2                   3
@@ -752,3 +810,52 @@ func (o *CloseMessage) decodeXDR(xr *xdr.Reader) error {
 	o.Reason = xr.ReadStringMax(1024)
 	return xr.Error()
 }
+
+/*
+
+EmptyMessage Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct EmptyMessage {
+}
+
+*/
+
+func (o EmptyMessage) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o EmptyMessage) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o EmptyMessage) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o EmptyMessage) encodeXDR(xw *xdr.Writer) (int, error) {
+	return xw.Tot(), xw.Error()
+}
+
+func (o *EmptyMessage) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *EmptyMessage) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *EmptyMessage) decodeXDR(xr *xdr.Reader) error {
+	return xr.Error()
+}

+ 213 - 146
protocol/protocol.go

@@ -6,16 +6,21 @@ package protocol
 
 import (
 	"bufio"
+	"encoding/binary"
+	"encoding/hex"
 	"errors"
 	"fmt"
 	"io"
 	"sync"
 	"time"
 
-	"github.com/calmh/syncthing/xdr"
+	lz4 "github.com/bkaradzic/go-lz4"
 )
 
-const BlockSize = 128 * 1024
+const (
+	BlockSize         = 128 * 1024
+	MinCompressedSize = 128 // message must be this big to enable compression
+)
 
 const (
 	messageTypeClusterConfig = 0
@@ -82,21 +87,22 @@ type rawConnection struct {
 	state    int
 
 	cr *countingReader
-	xr *xdr.Reader
 
 	cw *countingWriter
 	wb *bufio.Writer
-	xw *xdr.Writer
 
-	awaiting    []chan asyncResult
+	awaiting    [4096]chan asyncResult
 	awaitingMut sync.Mutex
 
 	idxMut sync.Mutex // ensures serialization of Index calls
 
 	nextID chan int
-	outbox chan []encodable
+	outbox chan hdrMsg
 	closed chan struct{}
 	once   sync.Once
+
+	rdbuf0 []byte // used & reused by readMessage
+	rdbuf1 []byte // used & reused by readMessage
 }
 
 type asyncResult struct {
@@ -104,36 +110,32 @@ type asyncResult struct {
 	err error
 }
 
+type hdrMsg struct {
+	hdr header
+	msg encodable
+}
+
+type encodable interface {
+	AppendXDR([]byte) []byte
+}
+
 const (
 	pingTimeout  = 30 * time.Second
 	pingIdleTime = 60 * time.Second
 )
 
 func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string) Connection {
-	// Byte counters are at the lowest level, counting compressed bytes
 	cr := &countingReader{Reader: reader}
 	cw := &countingWriter{Writer: writer}
 
-	// Compression is just above counting
-	zr := newLZ4Reader(cr)
-	zw := newLZ4Writer(cw)
-
-	// We buffer writes on top of compression.
-	// The LZ4 reader is already internally buffered
-	wb := bufio.NewWriterSize(zw, 65536)
-
 	c := rawConnection{
 		id:       nodeID,
 		name:     name,
 		receiver: nativeModel{receiver},
 		state:    stateInitial,
 		cr:       cr,
-		xr:       xdr.NewReader(zr),
 		cw:       cw,
-		wb:       wb,
-		xw:       xdr.NewWriter(wb),
-		awaiting: make([]chan asyncResult, 0x1000),
-		outbox:   make(chan []encodable),
+		outbox:   make(chan hdrMsg),
 		nextID:   make(chan int),
 		closed:   make(chan struct{}),
 	}
@@ -162,7 +164,7 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) error {
 	default:
 	}
 	c.idxMut.Lock()
-	c.send(header{0, -1, messageTypeIndex}, IndexMessage{repo, idx})
+	c.send(-1, messageTypeIndex, IndexMessage{repo, idx})
 	c.idxMut.Unlock()
 	return nil
 }
@@ -175,7 +177,7 @@ func (c *rawConnection) IndexUpdate(repo string, idx []FileInfo) error {
 	default:
 	}
 	c.idxMut.Lock()
-	c.send(header{0, -1, messageTypeIndexUpdate}, IndexMessage{repo, idx})
+	c.send(-1, messageTypeIndexUpdate, IndexMessage{repo, idx})
 	c.idxMut.Unlock()
 	return nil
 }
@@ -197,8 +199,7 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
 	c.awaiting[id] = rc
 	c.awaitingMut.Unlock()
 
-	ok := c.send(header{0, id, messageTypeRequest},
-		RequestMessage{repo, name, uint64(offset), uint32(size)})
+	ok := c.send(id, messageTypeRequest, RequestMessage{repo, name, uint64(offset), uint32(size)})
 	if !ok {
 		return nil, ErrClosed
 	}
@@ -212,7 +213,7 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
 
 // ClusterConfig send the cluster configuration message to the peer and returns any error
 func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
-	c.send(header{0, -1, messageTypeClusterConfig}, config)
+	c.send(-1, messageTypeClusterConfig, config)
 }
 
 func (c *rawConnection) ping() bool {
@@ -228,7 +229,7 @@ func (c *rawConnection) ping() bool {
 	c.awaiting[id] = rc
 	c.awaitingMut.Unlock()
 
-	ok := c.send(header{0, id, messageTypePing})
+	ok := c.send(id, messageTypePing, nil)
 	if !ok {
 		return false
 	}
@@ -249,68 +250,53 @@ func (c *rawConnection) readerLoop() (err error) {
 		default:
 		}
 
-		var hdr header
-		hdr.decodeXDR(c.xr)
-		if err := c.xr.Error(); err != nil {
+		hdr, msg, err := c.readMessage()
+		if err != nil {
 			return err
 		}
-		if hdr.version != 0 {
-			return fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)
-		}
 
 		switch hdr.msgType {
 		case messageTypeIndex:
 			if c.state < stateCCRcvd {
 				return fmt.Errorf("protocol error: index message in state %d", c.state)
 			}
-			if err := c.handleIndex(); err != nil {
-				return err
-			}
+			c.handleIndex(msg.(IndexMessage))
 			c.state = stateIdxRcvd
 
 		case messageTypeIndexUpdate:
 			if c.state < stateIdxRcvd {
 				return fmt.Errorf("protocol error: index update message in state %d", c.state)
 			}
-			if err := c.handleIndexUpdate(); err != nil {
-				return err
-			}
+			c.handleIndexUpdate(msg.(IndexMessage))
 
 		case messageTypeRequest:
 			if c.state < stateIdxRcvd {
 				return fmt.Errorf("protocol error: request message in state %d", c.state)
 			}
-			if err := c.handleRequest(hdr); err != nil {
-				return err
-			}
+			// Requests are handled asynchronously
+			go c.handleRequest(hdr.msgID, msg.(RequestMessage))
 
 		case messageTypeResponse:
 			if c.state < stateIdxRcvd {
 				return fmt.Errorf("protocol error: response message in state %d", c.state)
 			}
-			if err := c.handleResponse(hdr); err != nil {
-				return err
-			}
+			c.handleResponse(hdr.msgID, msg.(ResponseMessage))
 
 		case messageTypePing:
-			c.send(header{0, hdr.msgID, messageTypePong})
+			c.send(hdr.msgID, messageTypePong, EmptyMessage{})
 
 		case messageTypePong:
-			c.handlePong(hdr)
+			c.handlePong(hdr.msgID)
 
 		case messageTypeClusterConfig:
 			if c.state != stateInitial {
 				return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
 			}
-			if err := c.handleClusterConfig(); err != nil {
-				return err
-			}
+			go c.receiver.ClusterConfig(c.id, msg.(ClusterConfigMessage))
 			c.state = stateCCRcvd
 
 		case messageTypeClose:
-			if err := c.handleClose(); err != nil {
-				return err
-			}
+			return errors.New(msg.(CloseMessage).Reason)
 
 		default:
 			return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
@@ -318,114 +304,153 @@ func (c *rawConnection) readerLoop() (err error) {
 	}
 }
 
-func (c *rawConnection) handleIndex() error {
-	var im IndexMessage
-	im.decodeXDR(c.xr)
-	if err := c.xr.Error(); err != nil {
-		return err
+func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
+	if cap(c.rdbuf0) < 8 {
+		c.rdbuf0 = make([]byte, 8)
 	} else {
-		if debug {
-			l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
-		}
-		c.receiver.Index(c.id, im.Repository, im.Files)
+		c.rdbuf0 = c.rdbuf0[:8]
+	}
+	_, err = io.ReadFull(c.cr, c.rdbuf0)
+	if err != nil {
+		return
 	}
-	return nil
-}
 
-func (c *rawConnection) handleIndexUpdate() error {
-	var im IndexMessage
-	im.decodeXDR(c.xr)
-	if err := c.xr.Error(); err != nil {
-		return err
+	hdr = decodeHeader(binary.BigEndian.Uint32(c.rdbuf0[0:4]))
+	msglen := int(binary.BigEndian.Uint32(c.rdbuf0[4:8]))
+
+	if debug {
+		l.Debugf("read header %v (msglen=%d)", hdr, msglen)
+	}
+
+	if cap(c.rdbuf0) < msglen {
+		c.rdbuf0 = make([]byte, msglen)
 	} else {
+		c.rdbuf0 = c.rdbuf0[:msglen]
+	}
+	_, err = io.ReadFull(c.cr, c.rdbuf0)
+	if err != nil {
+		return
+	}
+
+	if debug {
+		l.Debugf("read %d bytes", len(c.rdbuf0))
+	}
+
+	msgBuf := c.rdbuf0
+	if hdr.compression {
+		c.rdbuf1 = c.rdbuf1[:cap(c.rdbuf1)]
+		c.rdbuf1, err = lz4.Decode(c.rdbuf1, c.rdbuf0)
+		if err != nil {
+			return
+		}
+		msgBuf = c.rdbuf1
 		if debug {
-			l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
+			l.Debugf("decompressed to %d bytes", len(msgBuf))
 		}
-		c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
 	}
-	return nil
-}
 
-func (c *rawConnection) handleRequest(hdr header) error {
-	var req RequestMessage
-	req.decodeXDR(c.xr)
-	if err := c.xr.Error(); err != nil {
-		return err
+	if debug {
+		if len(msgBuf) > 1024 {
+			l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
+		} else {
+			l.Debugf("message data:\n%s", hex.Dump(msgBuf))
+		}
 	}
-	go c.processRequest(hdr.msgID, req)
-	return nil
-}
 
-func (c *rawConnection) handleResponse(hdr header) error {
-	data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
+	switch hdr.msgType {
+	case messageTypeIndex, messageTypeIndexUpdate:
+		var idx IndexMessage
+		err = idx.UnmarshalXDR(msgBuf)
+		msg = idx
 
-	if err := c.xr.Error(); err != nil {
-		return err
-	}
+	case messageTypeRequest:
+		var req RequestMessage
+		err = req.UnmarshalXDR(msgBuf)
+		msg = req
 
-	c.awaitingMut.Lock()
-	if rc := c.awaiting[hdr.msgID]; rc != nil {
-		c.awaiting[hdr.msgID] = nil
-		rc <- asyncResult{data, nil}
-		close(rc)
+	case messageTypeResponse:
+		var resp ResponseMessage
+		err = resp.UnmarshalXDR(msgBuf)
+		msg = resp
+
+	case messageTypePing, messageTypePong:
+		msg = EmptyMessage{}
+
+	case messageTypeClusterConfig:
+		var cc ClusterConfigMessage
+		err = cc.UnmarshalXDR(msgBuf)
+		msg = cc
+
+	case messageTypeClose:
+		var cm CloseMessage
+		err = cm.UnmarshalXDR(msgBuf)
+		msg = cm
+
+	default:
+		err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
 	}
-	c.awaitingMut.Unlock()
 
-	return nil
+	return
 }
 
-func (c *rawConnection) handlePong(hdr header) {
-	c.awaitingMut.Lock()
-	if rc := c.awaiting[hdr.msgID]; rc != nil {
-		c.awaiting[hdr.msgID] = nil
-		rc <- asyncResult{}
-		close(rc)
+func (c *rawConnection) handleIndex(im IndexMessage) {
+	if debug {
+		l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
 	}
-	c.awaitingMut.Unlock()
+	c.receiver.Index(c.id, im.Repository, im.Files)
 }
 
-func (c *rawConnection) handleClusterConfig() error {
-	var cm ClusterConfigMessage
-	cm.decodeXDR(c.xr)
-	if err := c.xr.Error(); err != nil {
-		return err
-	} else {
-		go c.receiver.ClusterConfig(c.id, cm)
+func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
+	if debug {
+		l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
 	}
-	return nil
+	c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
 }
 
-func (c *rawConnection) handleClose() error {
-	var cm CloseMessage
-	cm.decodeXDR(c.xr)
-	if err := c.xr.Error(); err != nil {
-		return err
-	}
-	return errors.New(cm.Reason)
+func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
+	data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
+
+	c.send(msgID, messageTypeResponse, ResponseMessage{data})
 }
 
-type encodable interface {
-	encodeXDR(*xdr.Writer) (int, error)
+func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
+	c.awaitingMut.Lock()
+	if rc := c.awaiting[msgID]; rc != nil {
+		c.awaiting[msgID] = nil
+		rc <- asyncResult{resp.Data, nil}
+		close(rc)
+	}
+	c.awaitingMut.Unlock()
 }
-type encodableBytes []byte
 
-func (e encodableBytes) encodeXDR(xw *xdr.Writer) (int, error) {
-	return xw.WriteBytes(e)
+func (c *rawConnection) handlePong(msgID int) {
+	c.awaitingMut.Lock()
+	if rc := c.awaiting[msgID]; rc != nil {
+		c.awaiting[msgID] = nil
+		rc <- asyncResult{}
+		close(rc)
+	}
+	c.awaitingMut.Unlock()
 }
 
-func (c *rawConnection) send(h header, es ...encodable) bool {
-	if h.msgID < 0 {
+func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool {
+	if msgID < 0 {
 		select {
 		case id := <-c.nextID:
-			h.msgID = id
+			msgID = id
 		case <-c.closed:
 			return false
 		}
 	}
-	msg := append([]encodable{h}, es...)
+
+	hdr := header{
+		version: 0,
+		msgID:   msgID,
+		msgType: msgType,
+	}
 
 	select {
-	case c.outbox <- msg:
+	case c.outbox <- hdrMsg{hdr, msg}:
 		return true
 	case <-c.closed:
 		return false
@@ -433,13 +458,71 @@ func (c *rawConnection) send(h header, es ...encodable) bool {
 }
 
 func (c *rawConnection) writerLoop() {
+	var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused
+	var uncBuf []byte            // buffer for uncompressed message, kept and reused
 	for {
+		var tempBuf []byte
+		var err error
+
 		select {
-		case es := <-c.outbox:
-			for _, e := range es {
-				e.encodeXDR(c.xw)
+		case hm := <-c.outbox:
+			if hm.msg != nil {
+				// Uncompressed message in uncBuf
+				uncBuf = hm.msg.AppendXDR(uncBuf[:0])
+
+				if len(uncBuf) >= MinCompressedSize {
+					// Use compression for large messages
+					hm.hdr.compression = true
+
+					// Make sure we have enough space for the compressed message plus header in msgBug
+					msgBuf = msgBuf[:cap(msgBuf)]
+					if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) {
+						msgBuf = make([]byte, maxLen)
+					}
+
+					// Compressed is written to msgBuf, we keep tb for the length only
+					tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf)
+					binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
+					msgBuf = msgBuf[0 : len(tempBuf)+8]
+
+					if debug {
+						l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
+					}
+				} else {
+					// No point in compressing very short messages
+					hm.hdr.compression = false
+
+					msgBuf = msgBuf[:cap(msgBuf)]
+					if l := len(uncBuf) + 8; l > len(msgBuf) {
+						msgBuf = make([]byte, l)
+					}
+
+					binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf)))
+					msgBuf = msgBuf[0 : len(uncBuf)+8]
+					copy(msgBuf[8:], uncBuf)
+
+					if debug {
+						l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
+					}
+				}
+			} else {
+				if debug {
+					l.Debugf("write empty message; %v", hm.hdr)
+				}
+				binary.BigEndian.PutUint32(msgBuf[4:8], 0)
+				msgBuf = msgBuf[:8]
 			}
-			if err := c.flush(); err != nil {
+
+			binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr))
+
+			if err == nil {
+				var n int
+				n, err = c.cw.Write(msgBuf)
+				if debug {
+					l.Debugf("wrote %d bytes on the wire", n)
+				}
+			}
+			if err != nil {
 				c.close(err)
 				return
 			}
@@ -449,16 +532,6 @@ func (c *rawConnection) writerLoop() {
 	}
 }
 
-func (c *rawConnection) flush() error {
-	if err := c.xw.Error(); err != nil {
-		return err
-	}
-	if err := c.wb.Flush(); err != nil {
-		return err
-	}
-	return nil
-}
-
 func (c *rawConnection) close(err error) {
 	c.once.Do(func() {
 		close(c.closed)
@@ -494,13 +567,13 @@ func (c *rawConnection) pingerLoop() {
 	for {
 		select {
 		case <-ticker:
-			if d := time.Since(c.xr.LastRead()); d < pingIdleTime {
+			if d := time.Since(c.cr.Last()); d < pingIdleTime {
 				if debug {
 					l.Debugln(c.id, "ping skipped after rd", d)
 				}
 				continue
 			}
-			if d := time.Since(c.xw.LastWrite()); d < pingIdleTime {
+			if d := time.Since(c.cw.Last()); d < pingIdleTime {
 				if debug {
 					l.Debugln(c.id, "ping skipped after wr", d)
 				}
@@ -532,12 +605,6 @@ func (c *rawConnection) pingerLoop() {
 	}
 }
 
-func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
-	data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
-
-	c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data))
-}
-
 type Statistics struct {
 	At            time.Time
 	InBytesTotal  uint64

+ 23 - 33
xdr/reader.go

@@ -7,18 +7,15 @@ package xdr
 import (
 	"errors"
 	"io"
-	"time"
 )
 
 var ErrElementSizeExceeded = errors.New("element size exceeded")
 
 type Reader struct {
-	r    io.Reader
-	tot  int
-	err  error
-	b    [8]byte
-	sb   []byte
-	last time.Time
+	r   io.Reader
+	err error
+	b   [8]byte
+	sb  []byte
 }
 
 func NewReader(r io.Reader) *Reader {
@@ -63,8 +60,6 @@ func (r *Reader) ReadBytesMaxInto(max int, dst []byte) []byte {
 	if r.err != nil {
 		return nil
 	}
-	r.last = time.Now()
-	s := r.tot
 
 	l := int(r.ReadUint32())
 	if r.err != nil {
@@ -85,17 +80,16 @@ func (r *Reader) ReadBytesMaxInto(max int, dst []byte) []byte {
 	n, r.err = io.ReadFull(r.r, dst)
 	if r.err != nil {
 		if debug {
-			dl.Debugf("@0x%x: rd bytes (%d): %v", s, len(dst), r.err)
+			dl.Debugf("rd bytes (%d): %v", len(dst), r.err)
 		}
 		return nil
 	}
-	r.tot += n
 
 	if debug {
 		if n > maxDebugBytes {
-			dl.Debugf("@0x%x: rd bytes (%d): %x...", s, len(dst), dst[:maxDebugBytes])
+			dl.Debugf("rd bytes (%d): %x...", len(dst), dst[:maxDebugBytes])
 		} else {
-			dl.Debugf("@0x%x: rd bytes (%d): %x", s, len(dst), dst)
+			dl.Debugf("rd bytes (%d): %x", len(dst), dst)
 		}
 	}
 	return dst[:l]
@@ -113,15 +107,11 @@ func (r *Reader) ReadUint32() uint32 {
 	if r.err != nil {
 		return 0
 	}
-	r.last = time.Now()
-	s := r.tot
 
-	var n int
-	n, r.err = io.ReadFull(r.r, r.b[:4])
-	r.tot += n
+	_, r.err = io.ReadFull(r.r, r.b[:4])
 	if r.err != nil {
 		if debug {
-			dl.Debugf("@0x%x: rd uint32: %v", r.tot, r.err)
+			dl.Debugf("rd uint32: %v", r.err)
 		}
 		return 0
 	}
@@ -129,7 +119,7 @@ func (r *Reader) ReadUint32() uint32 {
 	v := uint32(r.b[3]) | uint32(r.b[2])<<8 | uint32(r.b[1])<<16 | uint32(r.b[0])<<24
 
 	if debug {
-		dl.Debugf("@0x%x: rd uint32=%d (0x%08x)", s, v, v)
+		dl.Debugf("rd uint32=%d (0x%08x)", v, v)
 	}
 	return v
 }
@@ -138,15 +128,11 @@ func (r *Reader) ReadUint64() uint64 {
 	if r.err != nil {
 		return 0
 	}
-	r.last = time.Now()
-	s := r.tot
 
-	var n int
-	n, r.err = io.ReadFull(r.r, r.b[:8])
-	r.tot += n
+	_, r.err = io.ReadFull(r.r, r.b[:8])
 	if r.err != nil {
 		if debug {
-			dl.Debugf("@0x%x: rd uint64: %v", r.tot, r.err)
+			dl.Debugf("rd uint64: %v", r.err)
 		}
 		return 0
 	}
@@ -155,19 +141,23 @@ func (r *Reader) ReadUint64() uint64 {
 		uint64(r.b[3])<<32 | uint64(r.b[2])<<40 | uint64(r.b[1])<<48 | uint64(r.b[0])<<56
 
 	if debug {
-		dl.Debugf("@0x%x: rd uint64=%d (0x%016x)", s, v, v)
+		dl.Debugf("rd uint64=%d (0x%016x)", v, v)
 	}
 	return v
 }
 
-func (r *Reader) Tot() int {
-	return r.tot
+type XDRError struct {
+	op  string
+	err error
 }
 
-func (r *Reader) Error() error {
-	return r.err
+func (e XDRError) Error() string {
+	return "xdr " + e.op + ": " + e.err.Error()
 }
 
-func (r *Reader) LastRead() time.Time {
-	return r.last
+func (r *Reader) Error() error {
+	if r.err == nil {
+		return nil
+	}
+	return XDRError{"read", r.err}
 }

+ 9 - 17
xdr/writer.go

@@ -4,10 +4,7 @@
 
 package xdr
 
-import (
-	"io"
-	"time"
-)
+import "io"
 
 func pad(l int) int {
 	d := l % 4
@@ -20,11 +17,10 @@ func pad(l int) int {
 var padBytes = []byte{0, 0, 0}
 
 type Writer struct {
-	w    io.Writer
-	tot  int
-	err  error
-	b    [8]byte
-	last time.Time
+	w   io.Writer
+	tot int
+	err error
+	b   [8]byte
 }
 
 type AppendWriter []byte
@@ -49,7 +45,6 @@ func (w *Writer) WriteBytes(bs []byte) (int, error) {
 		return 0, w.err
 	}
 
-	w.last = time.Now()
 	w.WriteUint32(uint32(len(bs)))
 	if w.err != nil {
 		return 0, w.err
@@ -93,7 +88,6 @@ func (w *Writer) WriteUint32(v uint32) (int, error) {
 		return 0, w.err
 	}
 
-	w.last = time.Now()
 	if debug {
 		dl.Debugf("wr uint32=%d", v)
 	}
@@ -114,7 +108,6 @@ func (w *Writer) WriteUint64(v uint64) (int, error) {
 		return 0, w.err
 	}
 
-	w.last = time.Now()
 	if debug {
 		dl.Debugf("wr uint64=%d", v)
 	}
@@ -139,9 +132,8 @@ func (w *Writer) Tot() int {
 }
 
 func (w *Writer) Error() error {
-	return w.err
-}
-
-func (w *Writer) LastWrite() time.Time {
-	return w.last
+	if w.err == nil {
+		return nil
+	}
+	return XDRError{"write", w.err}
 }