Browse Source

Restructure protocol code with less locking

Jakob Borg 11 years ago
parent
commit
1a1f118f1a

+ 3 - 0
integration/h1/config.xml

@@ -9,6 +9,9 @@
         <node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3">
             <address>127.0.0.1:22003</address>
         </node>
+        <node id="EJHMPAQOGCVORISB4IS3SYYVJXTKJGLTU66DIQPGJ5D2GXGQ3OWQ" name="s4">
+            <address>127.0.0.1:22004</address>
+        </node>
     </repository>
     <repository id="s12" directory="s12-1">
         <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1">

+ 23 - 0
integration/h4/cert.pem

@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIID3jCCAkigAwIBAgIBADALBgkqhkiG9w0BAQswFDESMBAGA1UEAxMJc3luY3Ro
+aW5nMB4XDTE0MDUxMDAwNTM0N1oXDTQ5MTIzMTIzNTk1OVowFDESMBAGA1UEAxMJ
+c3luY3RoaW5nMIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA9MRyBtAr
+Sjt29azNoCWxx5xZF3RodBcQu+wv5sRR8lWozrr4brfUJLslcQHowqaAprOU1NP+
+BH12P5CSymsUrwAmCwSQ54CimXrNi5RiNMl7dtInJksk4Kp6nJgfyR7TqeQgqxtv
++skVWdJY7ptxqpVuDfkf1JnNr68dbANw8hEJpPaGm3qOt81YvSg37R75HiOCzv+h
+FcSjKpPyFMvPARMCOHuZS0fYRJtI5nwmR0mWtKfnH/2204YNiQUne/8h2fgtkpxy
+OjxKOs2KJxbmpV6Uur/YyGyinb5+Aa0df3KCBuZmE+i/AsZcTsk0fgefe+bshWG/
+hzrNfV0wsX3TYjYOSBJ04+f/uQW00G1GGSxPwTsShGqVuwfJkTqkjAXX5wcH+PgJ
+ewG/dyMzKklMg19Y65WkhpWa/19o2KSZNw6TO8YM1arwT0STcMc+4fdrVB09lX6q
+NJA8UL8hUX+jbKBzatDY64h1d9E8PE0ODHYgYFO2Ko7e2GnWCQeijGmnAgMBAAGj
+PzA9MA4GA1UdDwEB/wQEAwIAoDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUH
+AwIwDAYDVR0TAQH/BAIwADALBgkqhkiG9w0BAQsDggGBANFiHcATP5Lm11o65wbh
+sKk7yteTapRohMoLNdW44YNyM8ZkELnrdNY8pe3CWSGy3spBH01+4jbUT+gSltQr
+KTLVxSZ7f91696Og5ag4BQCeFY6ghKD/G9+PlBSj6yb3Y98NZsx8huLfylH+XuJw
+2gP5Nqov4uXaKgYylx2gdaeCb2M+wM/br1DO2HCPCmgbZE5g8RM5JxzojGn/41Le
+IbCd39zdI6NKj9c7T1Bxmt20uzca4nRgXVVzJymedEoF+//sBRk6PQzqgjgn/r3S
+h9vrqo5j8ly/+ojFjBaVY7gq2XHM6/q0LTjeKkv2MUQw+vEEZX65GpBOgBZ8U0Wb
+/NMUUhhDjGE/0G6TCJgq/HdkjmsNaWjO5sWjhnwXNImYXBdH4OenhXIrHcLhcnxN
+2n5sPkDc6n0LVVV7VAjBPXcTmu2uOSK02yqNZLLWJygp1Wl6lbiqLS3bJgYrUv2m
+YkRaR+IqVPw5EPs/QlH0qLBeCyIasaSWUVZeitVwRmqIUA==
+-----END CERTIFICATE-----

+ 36 - 0
integration/h4/config.xml

@@ -0,0 +1,36 @@
+<configuration version="2">
+    <repository id="unique" directory="s4" ro="false">
+        <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1"></node>
+        <node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2"></node>
+        <node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3"></node>
+        <node id="EJHMPAQOGCVORISB4IS3SYYVJXTKJGLTU66DIQPGJ5D2GXGQ3OWQ" name="s4"></node>
+    </repository>
+    <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1">
+        <address>127.0.0.1:22001</address>
+    </node>
+    <node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2">
+        <address>127.0.0.1:22002</address>
+    </node>
+    <node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3">
+        <address>127.0.0.1:22003</address>
+    </node>
+    <node id="EJHMPAQOGCVORISB4IS3SYYVJXTKJGLTU66DIQPGJ5D2GXGQ3OWQ" name="s4">
+        <address>dynamic</address>
+    </node>
+    <gui enabled="true">
+        <address>127.0.0.1:8084</address>
+    </gui>
+    <options>
+        <listenAddress>:22004</listenAddress>
+        <globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer>
+        <globalAnnounceEnabled>false</globalAnnounceEnabled>
+        <localAnnounceEnabled>true</localAnnounceEnabled>
+        <parallelRequests>16</parallelRequests>
+        <maxSendKbps>0</maxSendKbps>
+        <rescanIntervalS>60</rescanIntervalS>
+        <reconnectionIntervalS>10</reconnectionIntervalS>
+        <maxChangeKbps>1000</maxChangeKbps>
+        <startBrowser>false</startBrowser>
+        <upnpEnabled>false</upnpEnabled>
+    </options>
+</configuration>

+ 39 - 0
integration/h4/key.pem

@@ -0,0 +1,39 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIG5AIBAAKCAYEA9MRyBtArSjt29azNoCWxx5xZF3RodBcQu+wv5sRR8lWozrr4
+brfUJLslcQHowqaAprOU1NP+BH12P5CSymsUrwAmCwSQ54CimXrNi5RiNMl7dtIn
+Jksk4Kp6nJgfyR7TqeQgqxtv+skVWdJY7ptxqpVuDfkf1JnNr68dbANw8hEJpPaG
+m3qOt81YvSg37R75HiOCzv+hFcSjKpPyFMvPARMCOHuZS0fYRJtI5nwmR0mWtKfn
+H/2204YNiQUne/8h2fgtkpxyOjxKOs2KJxbmpV6Uur/YyGyinb5+Aa0df3KCBuZm
+E+i/AsZcTsk0fgefe+bshWG/hzrNfV0wsX3TYjYOSBJ04+f/uQW00G1GGSxPwTsS
+hGqVuwfJkTqkjAXX5wcH+PgJewG/dyMzKklMg19Y65WkhpWa/19o2KSZNw6TO8YM
+1arwT0STcMc+4fdrVB09lX6qNJA8UL8hUX+jbKBzatDY64h1d9E8PE0ODHYgYFO2
+Ko7e2GnWCQeijGmnAgMBAAECggGBAIjKaLdqC2d3CCqQonJH3q0hsaCsC9wlL9L2
+UmbzfKCkQq0WTNUDo2nLtUcMvBpclzWS0zCGMUYtH7Kyh3bclTigKqKpsJnQiA6i
+VNEW4jOCDp//HqYGBNwSKmftlIX/1mbx+VfnA5PyYR5LsivXb5TX4iOpAKL+Obdf
+dF/zJGIEJ5GrvNqTicMq3dcI7Qh18N9pFSe+MTZLKK0Y9Yetx0hgaTNL0AYEZtcg
+uYMmCvZ4J+Namo6EanKYTmQvHzvq/tZVMvud9Gcr6uKKtVBcgex9S/R7IicaKg78
+oDTgH0nDrpI55pZCX8vuVGk8nVTXXLTsMR1XojOpiYjS6ucfTkPEw3fOW/YRhHg5
+93TrdDiWkqSWube5LNUF87q65t/aw/y2EH2aTNqcPD5OQ+EZRS8OGYPqOrJ4Ycbp
+j6CMSE+LX2IDMQyJ+9J0vPHtFsAviBKQkPoQ1L6mvhJuw6ksy34NQGykNDHz7nQK
+SeqvCJ6XCtaWNkq+00lC3UFaGsjuUQKBwQD8+y370co5G7G5GDLbLE3i+pguUN7L
+5YfDj5qqsM9hOJNqeKAHrKFP2ii0F9WxGw/ruY0k8k7zUt6LepgwkCI5BYfckRKJ
+g8YsNTizjqPLRGtiqL9Garjo+xPxFGj+TkTg9fYD4xTWFa1I15zzCu7Ye7xObeEH
+LRtcm3R4fU54JDrKtKDccoQmTEAzsxRdNXi9ifc7qgjGBH9W02guuGPY4ltT1aZR
+bcO5vpi44Fnl2h6d7N6iwCtFJ0CaT1pAZ4UCgcEA97Asf5DTDWKByZBhk+VvuT1b
+6nMYjqKxDNMmCaomCmk8Mif0w9SEJmAg0b/gbs/H6T78a+9WjbN5q9xHcDU91uax
+TdCenTq7H981AjgUG7OA7XwYn+AKy+hGSnsTJglMJzJm6TGt+Sq0oO9EahBRDlsP
+PiQRot2gyQfubwcl3rhdErRwaCM92BUyPkC2fy2OppAeZOOxxuzxrvHflDOuDGCZ
+KPCmy6U9HV0JOAO2FSNJeZdNLBixXa1Pk8TgbLY7AoHBAPG7lhn9Qg3Fz9H9NINH
+13jfWdFQB0SwJEWTEAiwgMj2ha6Eau5KX63s2V4VNGVSZakqmZtHSneppOuEjq5A
+2+K+zS7PFPaACzos9OxmjU7rJu2UL4m66sv9NvXzOcxev+RyQs0+DKfw+K8VEG0Q
+8l+8BJiw2AjCalXYWbfUjMmyXNdbOCbN6kaqL+L26KuUL7Z1gd/qPw3wODmgMvoJ
+yabxzLDUA2PlzdPMMyTdhCllfkILmEXN+MrQkiOhVa0a/QKBwGZjAhH9ePD4fnQm
+5d8wIb3uGlfRGh6kLBIEGp42IqF9HPASykBFUhdW91odOhY0eAv4CHpJpnrO7QXY
++gLtT1HNbQ+gpGCUTZQAPbZcHhvRWQNSoA8+mtftfVj+hUzc3Qj68cWFzsfIGoDI
+R3ycoBUSGTvzxwKPIQ7Y43wr9UCa74Zy5mB16POw12MadxYda/F4c8f6w5taiRFr
+VKO7tT/Skp101U4rURcZRV1NU3BrdMz5eWI4FuGFafbIlIj7zwKBwHCt3VQt+JmZ
+OhCJR+8Q+jT0JvnMu1zi4CcMRiT8FbNdZDY/3B0wG4ySTNrEikFzIjihF4zIp2nv
+nD3qKQs+THl51GA8AnP9bNk7hknD7rXUuScndccTW58+PGrjqfwJp/1MEeOJQpoX
+0JML1w+dIKHzsKN0X6UL7Gyq8m+0SJKmQQguan3d3M8CMpnW0srgqOfJ+q1+bz8b
+6FuJeijoaN8+zyKkN+9R91Erw5pk+7vJRzEpDtkhprEE5tLNDKrXJw==
+-----END RSA PRIVATE KEY-----

+ 1 - 1
integration/test.sh

@@ -14,7 +14,7 @@ go build json.go
 
 start() {
 	echo "Starting..."
-	for i in 1 2 3 ; do
+	for i in 1 2 3 4 ; do
 		STPROFILER=":909$i" syncthing -home "h$i" &
 	done
 }

+ 250 - 205
protocol/protocol.go

@@ -9,7 +9,6 @@ import (
 	"sync"
 	"time"
 
-	"github.com/calmh/syncthing/buffers"
 	"github.com/calmh/syncthing/xdr"
 )
 
@@ -72,16 +71,18 @@ type rawConnection struct {
 	xr     *xdr.Reader
 	writer io.WriteCloser
 
-	cw     *countingWriter
-	wb     *bufio.Writer
-	xw     *xdr.Writer
-	wmut   sync.Mutex
-	closed bool
+	cw   *countingWriter
+	wb   *bufio.Writer
+	xw   *xdr.Writer
+	wmut sync.Mutex
 
-	awaiting  map[int]chan asyncResult
-	nextID    int
 	indexSent map[string]map[string][2]int64
+	awaiting  []chan asyncResult
 	imut      sync.Mutex
+
+	nextID chan int
+	outbox chan []encodable
+	closed chan struct{}
 }
 
 type asyncResult struct {
@@ -115,12 +116,17 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
 		cw:        cw,
 		wb:        wb,
 		xw:        xdr.NewWriter(wb),
-		awaiting:  make(map[int]chan asyncResult),
+		awaiting:  make([]chan asyncResult, 0x1000),
 		indexSent: make(map[string]map[string][2]int64),
+		outbox:    make(chan []encodable),
+		nextID:    make(chan int),
+		closed:    make(chan struct{}),
 	}
 
 	go c.readerLoop()
+	go c.writerLoop()
 	go c.pingerLoop()
+	go c.idGenerator()
 
 	return wireFormatConnection{&c}
 }
@@ -131,10 +137,6 @@ func (c *rawConnection) ID() string {
 
 // Index writes the list of file information to the connected peer node
 func (c *rawConnection) Index(repo string, idx []FileInfo) {
-	if c.isClosed() {
-		return
-	}
-
 	c.imut.Lock()
 	var msgType int
 	if c.indexSent[repo] == nil {
@@ -157,48 +159,32 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) {
 		}
 		idx = diff
 	}
-
-	id := c.nextID
-	c.nextID = (c.nextID + 1) & 0xfff
 	c.imut.Unlock()
 
-	c.wmut.Lock()
-	header{0, id, msgType}.encodeXDR(c.xw)
-	IndexMessage{repo, idx}.encodeXDR(c.xw)
-	err := c.flush()
-	c.wmut.Unlock()
-
-	if err != nil {
-		c.close(err)
-		return
-	}
+	c.send(header{0, -1, msgType}, IndexMessage{repo, idx})
 }
 
 // Request returns the bytes for the specified block after fetching them from the connected peer.
 func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
-	if c.isClosed() {
+	var id int
+	select {
+	case id = <-c.nextID:
+	case <-c.closed:
 		return nil, ErrClosed
 	}
 
 	c.imut.Lock()
-	id := c.nextID
-	c.nextID = (c.nextID + 1) & 0xfff
-	rc := make(chan asyncResult)
-	if _, ok := c.awaiting[id]; ok {
+	if ch := c.awaiting[id]; ch != nil {
 		panic("id taken")
 	}
+	rc := make(chan asyncResult)
 	c.awaiting[id] = rc
 	c.imut.Unlock()
 
-	c.wmut.Lock()
-	header{0, id, messageTypeRequest}.encodeXDR(c.xw)
-	RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw)
-	err := c.flush()
-	c.wmut.Unlock()
-
-	if err != nil {
-		c.close(err)
-		return nil, err
+	ok := c.send(header{0, id, messageTypeRequest},
+		RequestMessage{repo, name, uint64(offset), uint32(size)})
+	if !ok {
+		return nil, ErrClosed
 	}
 
 	res, ok := <-rc
@@ -210,45 +196,24 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
 
 // ClusterConfig send the cluster configuration message to the peer and returns any error
 func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
-	if c.isClosed() {
-		return
-	}
-
-	c.imut.Lock()
-	id := c.nextID
-	c.nextID = (c.nextID + 1) & 0xfff
-	c.imut.Unlock()
-
-	c.wmut.Lock()
-	header{0, id, messageTypeClusterConfig}.encodeXDR(c.xw)
-	config.encodeXDR(c.xw)
-	err := c.flush()
-	c.wmut.Unlock()
-
-	if err != nil {
-		c.close(err)
-	}
+	c.send(header{0, -1, messageTypeClusterConfig}, config)
 }
 
 func (c *rawConnection) ping() bool {
-	if c.isClosed() {
+	var id int
+	select {
+	case id = <-c.nextID:
+	case <-c.closed:
 		return false
 	}
 
-	c.imut.Lock()
-	id := c.nextID
-	c.nextID = (c.nextID + 1) & 0xfff
 	rc := make(chan asyncResult, 1)
+	c.imut.Lock()
 	c.awaiting[id] = rc
 	c.imut.Unlock()
 
-	c.wmut.Lock()
-	header{0, id, messageTypePing}.encodeXDR(c.xw)
-	err := c.flush()
-	c.wmut.Unlock()
-
-	if err != nil {
-		c.close(err)
+	ok := c.send(header{0, id, messageTypePing})
+	if !ok {
 		return false
 	}
 
@@ -256,180 +221,251 @@ func (c *rawConnection) ping() bool {
 	return ok && res.err == nil
 }
 
-type flusher interface {
-	Flush() error
+func (c *rawConnection) readerLoop() (err error) {
+	defer func() {
+		c.close(err)
+	}()
+
+	for {
+		select {
+		case <-c.closed:
+			return ErrClosed
+		default:
+		}
+
+		var hdr header
+		hdr.decodeXDR(c.xr)
+		if err := c.xr.Error(); err != nil {
+			return err
+		}
+		if hdr.version != 0 {
+			return fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)
+		}
+
+		switch hdr.msgType {
+		case messageTypeIndex:
+			if err := c.handleIndex(); err != nil {
+				return err
+			}
+
+		case messageTypeIndexUpdate:
+			if err := c.handleIndexUpdate(); err != nil {
+				return err
+			}
+
+		case messageTypeRequest:
+			if err := c.handleRequest(hdr); err != nil {
+				return err
+			}
+
+		case messageTypeResponse:
+			if err := c.handleResponse(hdr); err != nil {
+				return err
+			}
+
+		case messageTypePing:
+			c.send(header{0, hdr.msgID, messageTypePong})
+
+		case messageTypePong:
+			c.handlePong(hdr)
+
+		case messageTypeClusterConfig:
+			if err := c.handleClusterConfig(); err != nil {
+				return err
+			}
+
+		default:
+			return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
+		}
+	}
 }
 
-func (c *rawConnection) flush() error {
-	if err := c.xw.Error(); err != nil {
+func (c *rawConnection) handleIndex() error {
+	var im IndexMessage
+	im.decodeXDR(c.xr)
+	if err := c.xr.Error(); err != nil {
 		return err
+	} else {
+
+		// We run this (and the corresponding one for update, below)
+		// in a separate goroutine to avoid blocking the read loop.
+		// There is otherwise a potential deadlock where both sides
+		// has the model locked because it's sending a large index
+		// update and can't receive the large index update from the
+		// other side.
+
+		go c.receiver.Index(c.id, im.Repository, im.Files)
 	}
+	return nil
+}
 
-	if err := c.wb.Flush(); err != nil {
+func (c *rawConnection) handleIndexUpdate() error {
+	var im IndexMessage
+	im.decodeXDR(c.xr)
+	if err := c.xr.Error(); err != nil {
 		return err
+	} else {
+		go c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
 	}
+	return nil
+}
 
-	if f, ok := c.writer.(flusher); ok {
-		return f.Flush()
+func (c *rawConnection) handleRequest(hdr header) error {
+	var req RequestMessage
+	req.decodeXDR(c.xr)
+	if err := c.xr.Error(); err != nil {
+		return err
+	}
+	go c.processRequest(hdr.msgID, req)
+	return nil
+}
+
+func (c *rawConnection) handleResponse(hdr header) error {
+	data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
+
+	if err := c.xr.Error(); err != nil {
+		return err
 	}
 
+	go func(hdr header, err error) {
+		c.imut.Lock()
+		rc := c.awaiting[hdr.msgID]
+		c.awaiting[hdr.msgID] = nil
+		c.imut.Unlock()
+
+		if rc != nil {
+			rc <- asyncResult{data, err}
+			close(rc)
+		}
+	}(hdr, c.xr.Error())
+
 	return nil
 }
 
-func (c *rawConnection) close(err error) {
+func (c *rawConnection) handlePong(hdr header) {
 	c.imut.Lock()
-	c.wmut.Lock()
-	defer c.imut.Unlock()
-	defer c.wmut.Unlock()
+	if rc := c.awaiting[hdr.msgID]; rc != nil {
+		go func() {
+			rc <- asyncResult{}
+			close(rc)
+		}()
 
-	if c.closed {
-		return
+		c.awaiting[hdr.msgID] = nil
 	}
+	c.imut.Unlock()
+}
 
-	c.closed = true
-
-	for _, ch := range c.awaiting {
-		close(ch)
+func (c *rawConnection) handleClusterConfig() error {
+	var cm ClusterConfigMessage
+	cm.decodeXDR(c.xr)
+	if err := c.xr.Error(); err != nil {
+		return err
+	} else {
+		go c.receiver.ClusterConfig(c.id, cm)
 	}
-	c.awaiting = nil
-	c.writer.Close()
-	c.reader.Close()
+	return nil
+}
 
-	c.receiver.Close(c.id, err)
+type encodable interface {
+	encodeXDR(*xdr.Writer) (int, error)
 }
+type encodableBytes []byte
 
-func (c *rawConnection) isClosed() bool {
-	c.wmut.Lock()
-	defer c.wmut.Unlock()
-	return c.closed
+func (e encodableBytes) encodeXDR(xw *xdr.Writer) (int, error) {
+	return xw.WriteBytes(e)
 }
 
-func (c *rawConnection) readerLoop() {
-loop:
-	for !c.isClosed() {
-		var hdr header
-		hdr.decodeXDR(c.xr)
-		if err := c.xr.Error(); err != nil {
-			c.close(err)
-			break loop
-		}
-		if hdr.version != 0 {
-			c.close(fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version))
-			break loop
+func (c *rawConnection) send(h header, es ...encodable) bool {
+	if h.msgID < 0 {
+		select {
+		case id := <-c.nextID:
+			h.msgID = id
+		case <-c.closed:
+			return false
 		}
+	}
+	msg := append([]encodable{h}, es...)
 
-		switch hdr.msgType {
-		case messageTypeIndex:
-			var im IndexMessage
-			im.decodeXDR(c.xr)
-			if err := c.xr.Error(); err != nil {
-				c.close(err)
-				break loop
-			} else {
-
-				// We run this (and the corresponding one for update, below)
-				// in a separate goroutine to avoid blocking the read loop.
-				// There is otherwise a potential deadlock where both sides
-				// has the model locked because it's sending a large index
-				// update and can't receive the large index update from the
-				// other side.
-
-				go c.receiver.Index(c.id, im.Repository, im.Files)
-			}
+	select {
+	case c.outbox <- msg:
+		return true
+	case <-c.closed:
+		return false
+	}
+}
 
-		case messageTypeIndexUpdate:
-			var im IndexMessage
-			im.decodeXDR(c.xr)
-			if err := c.xr.Error(); err != nil {
-				c.close(err)
-				break loop
-			} else {
-				go c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
-			}
+func (c *rawConnection) writerLoop() {
+	var err error
+	for es := range c.outbox {
+		c.wmut.Lock()
+		for _, e := range es {
+			e.encodeXDR(c.xw)
+		}
 
-		case messageTypeRequest:
-			var req RequestMessage
-			req.decodeXDR(c.xr)
-			if err := c.xr.Error(); err != nil {
-				c.close(err)
-				break loop
-			}
-			go c.processRequest(hdr.msgID, req)
+		if err = c.flush(); err != nil {
+			c.wmut.Unlock()
+			c.close(err)
+			return
+		}
+		c.wmut.Unlock()
+	}
+}
 
-		case messageTypeResponse:
-			data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
+type flusher interface {
+	Flush() error
+}
 
-			if err := c.xr.Error(); err != nil {
-				c.close(err)
-				break loop
-			}
+func (c *rawConnection) flush() error {
+	if err := c.xw.Error(); err != nil {
+		return err
+	}
 
-			go func(hdr header, err error) {
-				c.imut.Lock()
-				rc, ok := c.awaiting[hdr.msgID]
-				delete(c.awaiting, hdr.msgID)
-				c.imut.Unlock()
+	if err := c.wb.Flush(); err != nil {
+		return err
+	}
 
-				if ok {
-					rc <- asyncResult{data, err}
-					close(rc)
-				}
-			}(hdr, c.xr.Error())
+	if f, ok := c.writer.(flusher); ok {
+		return f.Flush()
+	}
 
-		case messageTypePing:
-			c.wmut.Lock()
-			header{0, hdr.msgID, messageTypePong}.encodeXDR(c.xw)
-			err := c.flush()
-			c.wmut.Unlock()
-			if err != nil {
-				c.close(err)
-				break loop
-			}
+	return nil
+}
 
-		case messageTypePong:
-			c.imut.Lock()
-			rc, ok := c.awaiting[hdr.msgID]
+func (c *rawConnection) close(err error) {
+	c.imut.Lock()
+	c.wmut.Lock()
+	defer c.imut.Unlock()
+	defer c.wmut.Unlock()
 
-			if ok {
-				go func() {
-					rc <- asyncResult{}
-					close(rc)
-				}()
+	select {
+	case <-c.closed:
+		return
+	default:
+		close(c.closed)
 
-				delete(c.awaiting, hdr.msgID)
+		for i, ch := range c.awaiting {
+			if ch != nil {
+				close(ch)
+				c.awaiting[i] = nil
 			}
-			c.imut.Unlock()
+		}
 
-		case messageTypeClusterConfig:
-			var cm ClusterConfigMessage
-			cm.decodeXDR(c.xr)
-			if err := c.xr.Error(); err != nil {
-				c.close(err)
-				break loop
-			} else {
-				go c.receiver.ClusterConfig(c.id, cm)
-			}
+		c.writer.Close()
+		c.reader.Close()
 
-		default:
-			c.close(fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType))
-			break loop
-		}
+		c.receiver.Close(c.id, err)
 	}
 }
 
-func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
-	data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
-
-	c.wmut.Lock()
-	header{0, msgID, messageTypeResponse}.encodeXDR(c.xw)
-	c.xw.WriteBytes(data)
-	err := c.flush()
-	c.wmut.Unlock()
-
-	buffers.Put(data)
-
-	if err != nil {
-		c.close(err)
+func (c *rawConnection) idGenerator() {
+	nextID := 0
+	for {
+		nextID = (nextID + 1) & 0xfff
+		select {
+		case c.nextID <- nextID:
+		case <-c.closed:
+			return
+		}
 	}
 }
 
@@ -437,9 +473,6 @@ func (c *rawConnection) pingerLoop() {
 	var rc = make(chan bool, 1)
 	ticker := time.Tick(pingIdleTime / 2)
 	for {
-		if c.isClosed() {
-			return
-		}
 		select {
 		case <-ticker:
 			go func() {
@@ -452,11 +485,23 @@ func (c *rawConnection) pingerLoop() {
 				}
 			case <-time.After(pingTimeout):
 				c.close(fmt.Errorf("ping timeout"))
+			case <-c.closed:
+				return
 			}
+
+		case <-c.closed:
+			return
 		}
 	}
 }
 
+func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
+	data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
+
+	c.send(header{0, msgID, messageTypeResponse},
+		encodableBytes(data))
+}
+
 type Statistics struct {
 	At            time.Time
 	InBytesTotal  int

+ 1 - 3
protocol/protocol_test.go

@@ -174,9 +174,7 @@ func TestClose(t *testing.T) {
 
 	c0.close(nil)
 
-	if !c0.isClosed() {
-		t.Fatal("Connection should be closed")
-	}
+	<-c0.closed
 	if !m0.isClosed() {
 		t.Fatal("Connection should be closed")
 	}