Browse Source

Locking/Ping cleanup

Jakob Borg 12 years ago
parent
commit
fc4b23fbc6
4 changed files with 56 additions and 55 deletions
  1. 0 5
      main.go
  2. 9 5
      model.go
  3. 44 42
      protocol/protocol.go
  4. 3 3
      protocol/protocol_test.go

+ 0 - 5
main.go

@@ -282,11 +282,6 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model,
 				nc := protocol.NewConnection(nodeID, conn, conn, m)
 				okln("Connected to node", remoteID, "(out)")
 				m.AddNode(nc)
-				if opts.Debug.TraceNet {
-					t0 := time.Now()
-					nc.Ping()
-					timing("NET: Ping reply", t0)
-				}
 				continue nextNode
 			}
 		}

+ 9 - 5
model.go

@@ -60,7 +60,7 @@ func (m *Model) Start() {
 
 func (m *Model) printStats() {
 	for {
-		time.Sleep(15 * time.Second)
+		time.Sleep(60 * time.Second)
 		m.RLock()
 		for node, conn := range m.nodes {
 			stats := conn.Statistics()
@@ -223,7 +223,7 @@ func (m *Model) ReplaceLocal(fs []File) {
 		m.recomputeGlobal()
 		m.recomputeNeed()
 		m.updated = time.Now().Unix()
-		go m.broadcastIndex()
+		m.broadcastIndex()
 	}
 }
 
@@ -231,10 +231,11 @@ func (m *Model) ReplaceLocal(fs []File) {
 func (m *Model) broadcastIndex() {
 	idx := m.protocolIndex()
 	for _, node := range m.nodes {
+		node := node
 		if opts.Debug.TraceNet {
 			debugf("NET IDX(out): %s: %d files", node.ID, len(idx))
 		}
-		node.Index(idx)
+		go node.Index(idx)
 	}
 }
 
@@ -271,7 +272,7 @@ func (m *Model) UpdateLocal(f File) {
 		m.recomputeGlobal()
 		m.recomputeNeed()
 		m.updated = time.Now().Unix()
-		go m.broadcastIndex()
+		m.broadcastIndex()
 	}
 }
 
@@ -400,5 +401,8 @@ func (m *Model) AddNode(node *protocol.Connection) {
 	if opts.Debug.TraceNet {
 		debugf("NET IDX(out): %s: %d files", node.ID, len(idx))
 	}
-	node.Index(idx)
+
+	// Sending the index might take a while if we have many files and a slow
+	// uplink. Return from AddNode in the meantime.
+	go node.Index(idx)
 }

+ 44 - 42
protocol/protocol.go

@@ -42,13 +42,13 @@ type Model interface {
 }
 
 type Connection struct {
+	sync.RWMutex
 	ID             string
 	receiver       Model
 	reader         io.Reader
 	mreader        *marshalReader
 	writer         io.Writer
 	mwriter        *marshalWriter
-	wLock          sync.RWMutex
 	closed         bool
 	awaiting       map[int]chan asyncResult
 	nextId         int
@@ -94,12 +94,12 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
 
 // Index writes the list of file information to the connected peer node
 func (c *Connection) Index(idx []FileInfo) {
-	c.wLock.Lock()
+	c.Lock()
 	c.mwriter.writeHeader(header{0, c.nextId, messageTypeIndex})
 	c.mwriter.writeIndex(idx)
 	err := c.flush()
 	c.nextId = (c.nextId + 1) & 0xfff
-	c.wLock.Unlock()
+	c.Unlock()
 	if err != nil || c.mwriter.err != nil {
 		c.close()
 		return
@@ -108,24 +108,24 @@ func (c *Connection) Index(idx []FileInfo) {
 
 // Request returns the bytes for the specified block after fetching them from the connected peer.
 func (c *Connection) Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
-	c.wLock.Lock()
+	c.Lock()
 	rc := make(chan asyncResult)
 	c.awaiting[c.nextId] = rc
 	c.mwriter.writeHeader(header{0, c.nextId, messageTypeRequest})
 	c.mwriter.writeRequest(request{name, offset, size, hash})
 	if c.mwriter.err != nil {
-		c.wLock.Unlock()
+		c.Unlock()
 		c.close()
 		return nil, c.mwriter.err
 	}
 	err := c.flush()
 	if err != nil {
-		c.wLock.Unlock()
+		c.Unlock()
 		c.close()
 		return nil, err
 	}
 	c.nextId = (c.nextId + 1) & 0xfff
-	c.wLock.Unlock()
+	c.Unlock()
 
 	res, ok := <-rc
 	if !ok {
@@ -134,22 +134,23 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt
 	return res.val, res.err
 }
 
-func (c *Connection) Ping() bool {
-	c.wLock.Lock()
+func (c *Connection) Ping() (time.Duration, bool) {
+	c.Lock()
 	rc := make(chan asyncResult)
 	c.awaiting[c.nextId] = rc
+	t0 := time.Now()
 	c.mwriter.writeHeader(header{0, c.nextId, messageTypePing})
 	err := c.flush()
 	if err != nil || c.mwriter.err != nil {
-		c.wLock.Unlock()
+		c.Unlock()
 		c.close()
-		return false
+		return 0, false
 	}
 	c.nextId = (c.nextId + 1) & 0xfff
-	c.wLock.Unlock()
+	c.Unlock()
 
 	_, ok := <-rc
-	return ok
+	return time.Since(t0), ok
 }
 
 func (c *Connection) Stop() {
@@ -167,9 +168,9 @@ func (c *Connection) flush() error {
 }
 
 func (c *Connection) close() {
-	c.wLock.Lock()
+	c.Lock()
 	if c.closed {
-		c.wLock.Unlock()
+		c.Unlock()
 		return
 	}
 	c.closed = true
@@ -177,14 +178,14 @@ func (c *Connection) close() {
 		close(ch)
 	}
 	c.awaiting = nil
-	c.wLock.Unlock()
+	c.Unlock()
 
 	c.receiver.Close(c.ID)
 }
 
 func (c *Connection) isClosed() bool {
-	c.wLock.RLock()
-	defer c.wLock.RUnlock()
+	c.RLock()
+	defer c.RUnlock()
 	return c.closed
 }
 
@@ -201,9 +202,9 @@ func (c *Connection) readerLoop() {
 			break
 		}
 
-		c.wLock.Lock()
+		c.Lock()
 		c.lastReceive = time.Now()
-		c.wLock.Unlock()
+		c.Unlock()
 
 		switch hdr.msgType {
 		case messageTypeIndex:
@@ -226,41 +227,41 @@ func (c *Connection) readerLoop() {
 			if c.mreader.err != nil {
 				c.close()
 			} else {
-				c.wLock.RLock()
+				c.RLock()
 				rc, ok := c.awaiting[hdr.msgID]
-				c.wLock.RUnlock()
+				c.RUnlock()
 
 				if ok {
 					rc <- asyncResult{data, c.mreader.err}
 					close(rc)
 
-					c.wLock.Lock()
+					c.Lock()
 					delete(c.awaiting, hdr.msgID)
-					c.wLock.Unlock()
+					c.Unlock()
 				}
 			}
 
 		case messageTypePing:
-			c.wLock.Lock()
+			c.Lock()
 			c.mwriter.writeUint32(encodeHeader(header{0, hdr.msgID, messageTypePong}))
 			err := c.flush()
-			c.wLock.Unlock()
+			c.Unlock()
 			if err != nil || c.mwriter.err != nil {
 				c.close()
 			}
 
 		case messageTypePong:
-			c.wLock.RLock()
+			c.RLock()
 			rc, ok := c.awaiting[hdr.msgID]
-			c.wLock.RUnlock()
+			c.RUnlock()
 
 			if ok {
 				rc <- asyncResult{}
 				close(rc)
 
-				c.wLock.Lock()
+				c.Lock()
 				delete(c.awaiting, hdr.msgID)
-				c.wLock.Unlock()
+				c.Unlock()
 			}
 
 		default:
@@ -277,11 +278,11 @@ func (c *Connection) processRequest(msgID int) {
 	} else {
 		go func() {
 			data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash)
-			c.wLock.Lock()
+			c.Lock()
 			c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse}))
 			c.mwriter.writeResponse(data)
 			err := c.flush()
-			c.wLock.Unlock()
+			c.Unlock()
 			buffers.Put(data)
 			if c.mwriter.err != nil || err != nil {
 				c.close()
@@ -291,23 +292,24 @@ func (c *Connection) processRequest(msgID int) {
 }
 
 func (c *Connection) pingerLoop() {
-	var rc = make(chan time.Duration)
+	var rc = make(chan time.Duration, 1)
 	for !c.isClosed() {
-		c.wLock.RLock()
+		c.RLock()
 		lr := c.lastReceive
-		c.wLock.RUnlock()
+		c.RUnlock()
 
 		if time.Since(lr) > pingIdleTime {
 			go func() {
-				t0 := time.Now()
-				c.Ping()
-				rc <- time.Since(t0)
+				t, ok := c.Ping()
+				if ok {
+					rc <- t
+				}
 			}()
 			select {
 			case lat := <-rc:
-				c.wLock.Lock()
+				c.Lock()
 				c.peerLatency = (c.peerLatency + lat) / 2
-				c.wLock.Unlock()
+				c.Unlock()
 			case <-time.After(pingTimeout):
 				c.close()
 			}
@@ -326,8 +328,8 @@ type Statistics struct {
 }
 
 func (c *Connection) Statistics() Statistics {
-	c.wLock.Lock()
-	defer c.wLock.Unlock()
+	c.Lock()
+	defer c.Unlock()
 
 	secs := time.Since(c.lastStatistics.At).Seconds()
 	stats := Statistics{

+ 3 - 3
protocol/protocol_test.go

@@ -46,10 +46,10 @@ func TestPing(t *testing.T) {
 	c0 := NewConnection("c0", ar, bw, nil)
 	c1 := NewConnection("c1", br, aw, nil)
 
-	if !c0.Ping() {
+	if _, ok := c0.Ping(); !ok {
 		t.Error("c0 ping failed")
 	}
-	if !c1.Ping() {
+	if _, ok := c1.Ping(); !ok {
 		t.Error("c1 ping failed")
 	}
 }
@@ -70,7 +70,7 @@ func TestPingErr(t *testing.T) {
 			c0 := NewConnection("c0", ar, ebw, m0)
 			NewConnection("c1", br, eaw, m1)
 
-			res := c0.Ping()
+			_, res := c0.Ping()
 			if (i < 4 || j < 4) && res {
 				t.Errorf("Unexpected ping success; i=%d, j=%d", i, j)
 			} else if (i >= 8 && j >= 8) && !res {