|
@@ -81,7 +81,7 @@ type Model interface {
|
|
|
// An index update was received from the peer device
|
|
|
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
|
|
|
// A request was made by the peer device
|
|
|
- Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error)
|
|
|
+ Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error
|
|
|
// A cluster configuration message was received
|
|
|
ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
|
|
|
// The peer device closed the connection
|
|
@@ -112,11 +112,11 @@ type rawConnection struct {
|
|
|
|
|
|
idxMut sync.Mutex // ensures serialization of Index calls
|
|
|
|
|
|
- nextID chan int
|
|
|
- outbox chan hdrMsg
|
|
|
- closed chan struct{}
|
|
|
- once sync.Once
|
|
|
-
|
|
|
+ nextID chan int
|
|
|
+ outbox chan hdrMsg
|
|
|
+ closed chan struct{}
|
|
|
+ once sync.Once
|
|
|
+ pool sync.Pool
|
|
|
compression Compression
|
|
|
|
|
|
rdbuf0 []byte // used & reused by readMessage
|
|
@@ -129,8 +129,9 @@ type asyncResult struct {
|
|
|
}
|
|
|
|
|
|
type hdrMsg struct {
|
|
|
- hdr header
|
|
|
- msg encodable
|
|
|
+ hdr header
|
|
|
+ msg encodable
|
|
|
+ done chan struct{}
|
|
|
}
|
|
|
|
|
|
type encodable interface {
|
|
@@ -151,14 +152,19 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
|
|
|
cw := &countingWriter{Writer: writer}
|
|
|
|
|
|
c := rawConnection{
|
|
|
- id: deviceID,
|
|
|
- name: name,
|
|
|
- receiver: nativeModel{receiver},
|
|
|
- cr: cr,
|
|
|
- cw: cw,
|
|
|
- outbox: make(chan hdrMsg),
|
|
|
- nextID: make(chan int),
|
|
|
- closed: make(chan struct{}),
|
|
|
+ id: deviceID,
|
|
|
+ name: name,
|
|
|
+ receiver: nativeModel{receiver},
|
|
|
+ cr: cr,
|
|
|
+ cw: cw,
|
|
|
+ outbox: make(chan hdrMsg),
|
|
|
+ nextID: make(chan int),
|
|
|
+ closed: make(chan struct{}),
|
|
|
+ pool: sync.Pool{
|
|
|
+ New: func() interface{} {
|
|
|
+ return make([]byte, BlockSize)
|
|
|
+ },
|
|
|
+ },
|
|
|
compression: compress,
|
|
|
}
|
|
|
|
|
@@ -195,7 +201,7 @@ func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, optio
|
|
|
Files: idx,
|
|
|
Flags: flags,
|
|
|
Options: options,
|
|
|
- })
|
|
|
+ }, nil)
|
|
|
c.idxMut.Unlock()
|
|
|
return nil
|
|
|
}
|
|
@@ -213,7 +219,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32,
|
|
|
Files: idx,
|
|
|
Flags: flags,
|
|
|
Options: options,
|
|
|
- })
|
|
|
+ }, nil)
|
|
|
c.idxMut.Unlock()
|
|
|
return nil
|
|
|
}
|
|
@@ -243,7 +249,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
|
|
|
Hash: hash,
|
|
|
Flags: flags,
|
|
|
Options: options,
|
|
|
- })
|
|
|
+ }, nil)
|
|
|
if !ok {
|
|
|
return nil, ErrClosed
|
|
|
}
|
|
@@ -257,7 +263,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
|
|
|
|
|
|
// ClusterConfig send the cluster configuration message to the peer and returns any error
|
|
|
func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
|
|
|
- c.send(-1, messageTypeClusterConfig, config)
|
|
|
+ c.send(-1, messageTypeClusterConfig, config, nil)
|
|
|
}
|
|
|
|
|
|
func (c *rawConnection) ping() bool {
|
|
@@ -273,7 +279,7 @@ func (c *rawConnection) ping() bool {
|
|
|
c.awaiting[id] = rc
|
|
|
c.awaitingMut.Unlock()
|
|
|
|
|
|
- ok := c.send(id, messageTypePing, nil)
|
|
|
+ ok := c.send(id, messageTypePing, nil, nil)
|
|
|
if !ok {
|
|
|
return false
|
|
|
}
|
|
@@ -342,7 +348,7 @@ func (c *rawConnection) readerLoop() (err error) {
|
|
|
if state != stateReady {
|
|
|
return fmt.Errorf("protocol error: ping message in state %d", state)
|
|
|
}
|
|
|
- c.send(hdr.msgID, messageTypePong, pongMessage{})
|
|
|
+ c.send(hdr.msgID, messageTypePong, pongMessage{}, nil)
|
|
|
|
|
|
case pongMessage:
|
|
|
if state != stateReady {
|
|
@@ -519,12 +525,36 @@ func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
|
|
|
}
|
|
|
|
|
|
func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
|
|
|
- data, err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), int(req.Size), req.Hash, req.Flags, req.Options)
|
|
|
+ size := int(req.Size)
|
|
|
+ usePool := size <= BlockSize
|
|
|
|
|
|
- c.send(msgID, messageTypeResponse, ResponseMessage{
|
|
|
- Data: data,
|
|
|
- Code: errorToCode(err),
|
|
|
- })
|
|
|
+ var buf []byte
|
|
|
+ var done chan struct{}
|
|
|
+
|
|
|
+ if usePool {
|
|
|
+ buf = c.pool.Get().([]byte)[:size]
|
|
|
+ done = make(chan struct{})
|
|
|
+ } else {
|
|
|
+ buf = make([]byte, size)
|
|
|
+ }
|
|
|
+
|
|
|
+ err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf)
|
|
|
+ if err != nil {
|
|
|
+ c.send(msgID, messageTypeResponse, ResponseMessage{
|
|
|
+ Data: nil,
|
|
|
+ Code: errorToCode(err),
|
|
|
+ }, done)
|
|
|
+ } else {
|
|
|
+ c.send(msgID, messageTypeResponse, ResponseMessage{
|
|
|
+ Data: buf,
|
|
|
+ Code: errorToCode(err),
|
|
|
+ }, done)
|
|
|
+ }
|
|
|
+
|
|
|
+ if usePool {
|
|
|
+ <-done
|
|
|
+ c.pool.Put(buf)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
|
|
@@ -547,7 +577,7 @@ func (c *rawConnection) handlePong(msgID int) {
|
|
|
c.awaitingMut.Unlock()
|
|
|
}
|
|
|
|
|
|
-func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool {
|
|
|
+func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool {
|
|
|
if msgID < 0 {
|
|
|
select {
|
|
|
case id := <-c.nextID:
|
|
@@ -564,7 +594,7 @@ func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool {
|
|
|
}
|
|
|
|
|
|
select {
|
|
|
- case c.outbox <- hdrMsg{hdr, msg}:
|
|
|
+ case c.outbox <- hdrMsg{hdr, msg, done}:
|
|
|
return true
|
|
|
case <-c.closed:
|
|
|
return false
|
|
@@ -583,6 +613,9 @@ func (c *rawConnection) writerLoop() {
|
|
|
if hm.msg != nil {
|
|
|
// Uncompressed message in uncBuf
|
|
|
uncBuf, err = hm.msg.AppendXDR(uncBuf[:0])
|
|
|
+ if hm.done != nil {
|
|
|
+ close(hm.done)
|
|
|
+ }
|
|
|
if err != nil {
|
|
|
c.close(err)
|
|
|
return
|