Browse Source

Avoid deadlock in index exchange by more fine grained locking

Jakob Borg 12 years ago
parent
commit
1c757db153
2 changed files with 16 additions and 13 deletions
  1. 8 8
      protocol/messages_test.go
  2. 8 5
      protocol/protocol.go

+ 8 - 8
protocol/messages_test.go

@@ -32,10 +32,10 @@ func TestIndex(t *testing.T) {
 	}
 
 	var buf = new(bytes.Buffer)
-	var wr = marshalWriter{buf, 0, nil}
+	var wr = marshalWriter{w: buf}
 	wr.writeIndex(idx)
 
-	var rd = marshalReader{buf, 0, nil}
+	var rd = marshalReader{r: buf}
 	var idx2 = rd.readIndex()
 
 	if !reflect.DeepEqual(idx, idx2) {
@@ -47,9 +47,9 @@ func TestRequest(t *testing.T) {
 	f := func(name string, offset uint64, size uint32, hash []byte) bool {
 		var buf = new(bytes.Buffer)
 		var req = request{name, offset, size, hash}
-		var wr = marshalWriter{buf, 0, nil}
+		var wr = marshalWriter{w: buf}
 		wr.writeRequest(req)
-		var rd = marshalReader{buf, 0, nil}
+		var rd = marshalReader{r: buf}
 		var req2 = rd.readRequest()
 		return req.name == req2.name &&
 			req.offset == req2.offset &&
@@ -64,9 +64,9 @@ func TestRequest(t *testing.T) {
 func TestResponse(t *testing.T) {
 	f := func(data []byte) bool {
 		var buf = new(bytes.Buffer)
-		var wr = marshalWriter{buf, 0, nil}
+		var wr = marshalWriter{w: buf}
 		wr.writeResponse(data)
-		var rd = marshalReader{buf, 0, nil}
+		var rd = marshalReader{r: buf}
 		var read = rd.readResponse()
 		return bytes.Compare(read, data) == 0
 	}
@@ -98,7 +98,7 @@ func BenchmarkWriteIndex(b *testing.B) {
 		},
 	}
 
-	var wr = marshalWriter{ioutil.Discard, 0, nil}
+	var wr = marshalWriter{w: ioutil.Discard}
 
 	for i := 0; i < b.N; i++ {
 		wr.writeIndex(idx)
@@ -107,7 +107,7 @@ func BenchmarkWriteIndex(b *testing.B) {
 
 func BenchmarkWriteRequest(b *testing.B) {
 	var req = request{"blah blah", 1231323, 13123123, []byte("hash hash hash")}
-	var wr = marshalWriter{ioutil.Discard, 0, nil}
+	var wr = marshalWriter{w: ioutil.Discard}
 
 	for i := 0; i < b.N; i++ {
 		wr.writeRequest(req)

+ 8 - 5
protocol/protocol.go

@@ -45,6 +45,7 @@ type Model interface {
 
 type Connection struct {
 	sync.RWMutex
+
 	ID             string
 	receiver       Model
 	reader         io.Reader
@@ -54,10 +55,12 @@ type Connection struct {
 	closed         bool
 	awaiting       map[int]chan asyncResult
 	nextId         int
-	lastReceive    time.Time
 	peerLatency    time.Duration
 	lastStatistics Statistics
 	indexSent      map[string]int64
+
+	lastReceive     time.Time
+	lastReceiveLock sync.RWMutex
 }
 
 var ErrClosed = errors.New("Connection closed")
@@ -234,9 +237,9 @@ func (c *Connection) readerLoop() {
 			break
 		}
 
-		c.Lock()
+		c.lastReceiveLock.Lock()
 		c.lastReceive = time.Now()
-		c.Unlock()
+		c.lastReceiveLock.Unlock()
 
 		switch hdr.msgType {
 		case messageTypeIndex:
@@ -334,9 +337,9 @@ func (c *Connection) processRequest(msgID int) {
 func (c *Connection) pingerLoop() {
 	var rc = make(chan time.Duration, 1)
 	for !c.isClosed() {
-		c.RLock()
+		c.lastReceiveLock.RLock()
 		lr := c.lastReceive
-		c.RUnlock()
+		c.lastReceiveLock.RUnlock()
 
 		if time.Since(lr) > pingIdleTime {
 			go func() {