Browse Source

Simplify index sending, prevent ping timeout

Jakob Borg 12 years ago
parent
commit
7e0be89052
4 changed files with 53 additions and 33 deletions
  1. 2 6
      main.go
  2. 27 17
      model.go
  3. 0 1
      model_puller.go
  4. 24 9
      protocol/protocol.go

+ 2 - 6
main.go

@@ -215,9 +215,7 @@ listen:
 
 		for nodeID := range nodeAddrs {
 			if nodeID == remoteID {
-				nc := protocol.NewConnection(remoteID, conn, conn, m)
-				m.AddNode(nc)
-				infoln("Connected to node", remoteID, "(in)")
+				m.AddConnection(conn, remoteID)
 				continue listen
 			}
 		}
@@ -286,9 +284,7 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model,
 					continue
 				}
 
-				nc := protocol.NewConnection(nodeID, conn, conn, m)
-				m.AddNode(nc)
-				infoln("Connected to node", remoteID, "(out)")
+				m.AddConnection(conn, remoteID)
 				continue nextNode
 			}
 		}

+ 27 - 17
model.go

@@ -13,6 +13,7 @@ acquire locks, but document what locks they require.
 
 import (
 	"fmt"
+	"io"
 	"os"
 	"path"
 	"sync"
@@ -26,11 +27,12 @@ type Model struct {
 	sync.RWMutex
 	dir string
 
-	global map[string]File // the latest version of each file as it exists in the cluster
-	local  map[string]File // the files we currently have locally on disk
-	remote map[string]map[string]File
-	need   map[string]bool // the files we need to update
-	nodes  map[string]*protocol.Connection
+	global  map[string]File // the latest version of each file as it exists in the cluster
+	local   map[string]File // the files we currently have locally on disk
+	remote  map[string]map[string]File
+	need    map[string]bool // the files we need to update
+	nodes   map[string]*protocol.Connection
+	rawConn map[string]io.ReadWriteCloser
 
 	updatedLocal int64 // timestamp of last update to local
 	updateGlobal int64 // timestamp of last update to remote
@@ -54,6 +56,7 @@ func NewModel(dir string) *Model {
 		remote:       make(map[string]map[string]File),
 		need:         make(map[string]bool),
 		nodes:        make(map[string]*protocol.Connection),
+		rawConn:      make(map[string]io.ReadWriteCloser),
 		lastIdxBcast: time.Now(),
 	}
 
@@ -192,6 +195,12 @@ func (m *Model) Close(node string, err error) {
 	m.Lock()
 	defer m.Unlock()
 
+	conn, ok := m.rawConn[node]
+	if !ok {
+		return
+	}
+	conn.Close()
+
 	if err != nil {
 		warnf("Disconnected from node %s: %v", node, err)
 	} else {
@@ -200,6 +209,7 @@ func (m *Model) Close(node string, err error) {
 
 	delete(m.remote, node)
 	delete(m.nodes, node)
+	delete(m.rawConn, node)
 
 	m.recomputeGlobal()
 	m.recomputeNeed()
@@ -460,24 +470,24 @@ func (m *Model) protocolIndex() []protocol.FileInfo {
 	return index
 }
 
-func (m *Model) AddNode(node *protocol.Connection) {
+func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
+	node := protocol.NewConnection(nodeID, conn, conn, m)
+
 	m.Lock()
-	m.nodes[node.ID] = node
+	m.nodes[nodeID] = node
+	m.rawConn[nodeID] = conn
 	m.Unlock()
+
+	infoln("Connected to node", nodeID)
+
 	m.RLock()
 	idx := m.protocolIndex()
 	m.RUnlock()
 
-	for i := 0; i < len(idx); i += 1000 {
-		s := i + 1000
-		if s > len(idx) {
-			s = len(idx)
-		}
-		if opts.Debug.TraceNet {
-			debugf("NET IDX(out/add): %s: %d:%d", node.ID, i, s)
-		}
-		node.Index(idx[i:s])
-	}
+	go func() {
+		node.Index(idx)
+		infoln("Sent initial index to node", nodeID)
+	}()
 }
 
 func fileFromFileInfo(f protocol.FileInfo) File {

+ 0 - 1
model_puller.go

@@ -129,7 +129,6 @@ func (m *Model) pullFile(name string) error {
 }
 
 func (m *Model) puller() {
-
 	for {
 		time.Sleep(time.Second)
 

+ 24 - 9
protocol/protocol.go

@@ -57,6 +57,9 @@ type Connection struct {
 	nextId    int
 	indexSent map[string]int64
 
+	hasSentIndex  bool
+	hasRecvdIndex bool
+
 	lastStatistics Statistics
 	statisticsLock sync.Mutex
 }
@@ -126,7 +129,9 @@ func (c *Connection) Index(idx []FileInfo) {
 	c.mwriter.writeIndex(idx)
 	err := c.flush()
 	c.nextId = (c.nextId + 1) & 0xfff
+	c.hasSentIndex = true
 	c.Unlock()
+
 	if err != nil {
 		c.Close(err)
 		return
@@ -252,6 +257,9 @@ loop:
 			} else {
 				c.receiver.Index(c.ID, files)
 			}
+			c.Lock()
+			c.hasRecvdIndex = true
+			c.Unlock()
 
 		case messageTypeIndexUpdate:
 			files := c.mreader.readIndex()
@@ -343,16 +351,23 @@ func (c *Connection) pingerLoop() {
 	var rc = make(chan bool, 1)
 	for {
 		time.Sleep(pingIdleTime / 2)
-		go func() {
-			rc <- c.Ping()
-		}()
-		select {
-		case ok := <-rc:
-			if !ok {
-				c.Close(fmt.Errorf("Ping failure"))
+
+		c.RLock()
+		ready := c.hasRecvdIndex && c.hasSentIndex
+		c.RUnlock()
+
+		if ready {
+			go func() {
+				rc <- c.Ping()
+			}()
+			select {
+			case ok := <-rc:
+				if !ok {
+					c.Close(fmt.Errorf("Ping failure"))
+				}
+			case <-time.After(pingTimeout):
+				c.Close(fmt.Errorf("Ping timeout"))
 			}
-		case <-time.After(pingTimeout):
-			c.Close(fmt.Errorf("Ping timeout"))
 		}
 	}
 }