Browse Source

Revert "Streamline error handling and locking" (fixes #172)

This reverts commit 116f232f5a20ca0e4d61f776c99e9a10b1cfa8dc.
Jakob Borg 11 years ago
parent
commit
28e347002a
2 changed files with 129 additions and 128 deletions
  1. 7 9
      cmd/syncthing/model.go
  2. 122 119
      protocol/protocol.go

+ 7 - 9
cmd/syncthing/model.go

@@ -423,16 +423,14 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)
 	cm := m.clusterConfig(nodeID)
 	protoConn.ClusterConfig(cm)
 
-	var idxToSend = make(map[string][]protocol.FileInfo)
-
-	m.rmut.RLock()
-	for _, repo := range m.nodeRepos[nodeID] {
-		idxToSend[repo] = m.protocolIndex(repo)
-	}
-	m.rmut.RUnlock()
-
 	go func() {
-		for repo, idx := range idxToSend {
+		m.rmut.RLock()
+		repos := m.nodeRepos[nodeID]
+		m.rmut.RUnlock()
+		for _, repo := range repos {
+			m.rmut.RLock()
+			idx := m.protocolIndex(repo)
+			m.rmut.RUnlock()
 			if debugNet {
 				dlog.Printf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
 			}

+ 122 - 119
protocol/protocol.go

@@ -64,26 +64,24 @@ type Connection interface {
 }
 
 type rawConnection struct {
-	id       string
-	receiver Model
-
-	reader io.ReadCloser
-	cr     *countingReader
-	xr     *xdr.Reader
-	writer io.WriteCloser
-
-	cw   *countingWriter
-	wb   *bufio.Writer
-	xw   *xdr.Writer
-	wmut sync.Mutex
-
-	close  chan error
-	closed chan struct{}
-
+	sync.RWMutex
+
+	id        string
+	receiver  Model
+	reader    io.ReadCloser
+	cr        *countingReader
+	xr        *xdr.Reader
+	writer    io.WriteCloser
+	cw        *countingWriter
+	wb        *bufio.Writer
+	xw        *xdr.Writer
+	closed    chan struct{}
 	awaiting  map[int]chan asyncResult
 	nextID    int
 	indexSent map[string]map[string][2]int64
-	imut      sync.Mutex
+
+	hasSentIndex  bool
+	hasRecvdIndex bool
 }
 
 type asyncResult struct {
@@ -117,13 +115,11 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
 		cw:        cw,
 		wb:        wb,
 		xw:        xdr.NewWriter(wb),
-		close:     make(chan error),
 		closed:    make(chan struct{}),
 		awaiting:  make(map[int]chan asyncResult),
 		indexSent: make(map[string]map[string][2]int64),
 	}
 
-	go c.closer()
 	go c.readerLoop()
 	go c.pingerLoop()
 
@@ -136,11 +132,11 @@ func (c *rawConnection) ID() string {
 
 // Index writes the list of file information to the connected peer node
 func (c *rawConnection) Index(repo string, idx []FileInfo) {
+	c.Lock()
 	if c.isClosed() {
+		c.Unlock()
 		return
 	}
-
-	c.imut.Lock()
 	var msgType int
 	if c.indexSent[repo] == nil {
 		// This is the first time we send an index.
@@ -163,48 +159,45 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) {
 		idx = diff
 	}
 
-	id := c.nextID
+	header{0, c.nextID, msgType}.encodeXDR(c.xw)
+	_, err := IndexMessage{repo, idx}.encodeXDR(c.xw)
+	if err == nil {
+		err = c.flush()
+	}
 	c.nextID = (c.nextID + 1) & 0xfff
-	c.imut.Unlock()
-
-	c.wmut.Lock()
-	header{0, id, msgType}.encodeXDR(c.xw)
-	IndexMessage{repo, idx}.encodeXDR(c.xw)
-	err := c.flush()
-	c.wmut.Unlock()
+	c.hasSentIndex = true
+	c.Unlock()
 
 	if err != nil {
-		c.close <- err
+		c.close(err)
 		return
 	}
 }
 
 // Request returns the bytes for the specified block after fetching them from the connected peer.
 func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
+	c.Lock()
 	if c.isClosed() {
+		c.Unlock()
 		return nil, ErrClosed
 	}
-
-	c.imut.Lock()
-	id := c.nextID
-	c.nextID = (c.nextID + 1) & 0xfff
 	rc := make(chan asyncResult)
-	if _, ok := c.awaiting[id]; ok {
+	if _, ok := c.awaiting[c.nextID]; ok {
 		panic("id taken")
 	}
-	c.awaiting[id] = rc
-	c.imut.Unlock()
-
-	c.wmut.Lock()
-	header{0, id, messageTypeRequest}.encodeXDR(c.xw)
-	RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw)
-	err := c.flush()
-	c.wmut.Unlock()
-
+	c.awaiting[c.nextID] = rc
+	header{0, c.nextID, messageTypeRequest}.encodeXDR(c.xw)
+	_, err := RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw)
+	if err == nil {
+		err = c.flush()
+	}
 	if err != nil {
-		c.close <- err
+		c.Unlock()
+		c.close(err)
 		return nil, err
 	}
+	c.nextID = (c.nextID + 1) & 0xfff
+	c.Unlock()
 
 	res, ok := <-rc
 	if !ok {
@@ -215,47 +208,46 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
 
 // ClusterConfig send the cluster configuration message to the peer and returns any error
 func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
+	c.Lock()
+	defer c.Unlock()
+
 	if c.isClosed() {
 		return
 	}
 
-	c.imut.Lock()
-	id := c.nextID
+	header{0, c.nextID, messageTypeClusterConfig}.encodeXDR(c.xw)
 	c.nextID = (c.nextID + 1) & 0xfff
-	c.imut.Unlock()
-
-	c.wmut.Lock()
-	header{0, id, messageTypeClusterConfig}.encodeXDR(c.xw)
-	config.encodeXDR(c.xw)
-	err := c.flush()
-	c.wmut.Unlock()
 
+	_, err := config.encodeXDR(c.xw)
+	if err == nil {
+		err = c.flush()
+	}
 	if err != nil {
-		c.close <- err
+		c.close(err)
 	}
 }
 
 func (c *rawConnection) ping() bool {
+	c.Lock()
 	if c.isClosed() {
+		c.Unlock()
 		return false
 	}
-
-	c.imut.Lock()
-	id := c.nextID
-	c.nextID = (c.nextID + 1) & 0xfff
 	rc := make(chan asyncResult, 1)
-	c.awaiting[id] = rc
-	c.imut.Unlock()
-
-	c.wmut.Lock()
-	header{0, id, messageTypePing}.encodeXDR(c.xw)
+	c.awaiting[c.nextID] = rc
+	header{0, c.nextID, messageTypePing}.encodeXDR(c.xw)
 	err := c.flush()
-	c.wmut.Unlock()
-
 	if err != nil {
-		c.close <- err
+		c.Unlock()
+		c.close(err)
+		return false
+	} else if c.xw.Error() != nil {
+		c.Unlock()
+		c.close(c.xw.Error())
 		return false
 	}
+	c.nextID = (c.nextID + 1) & 0xfff
+	c.Unlock()
 
 	res, ok := <-rc
 	return ok && res.err == nil
@@ -266,24 +258,21 @@ type flusher interface {
 }
 
 func (c *rawConnection) flush() error {
-	if err := c.xw.Error(); err != nil {
-		return err
-	}
-
-	if err := c.wb.Flush(); err != nil {
-		return err
-	}
-
+	c.wb.Flush()
 	if f, ok := c.writer.(flusher); ok {
 		return f.Flush()
 	}
-
 	return nil
 }
 
-func (c *rawConnection) closer() {
-	err := <-c.close
-
+func (c *rawConnection) close(err error) {
+	c.Lock()
+	select {
+	case <-c.closed:
+		c.Unlock()
+		return
+	default:
+	}
 	close(c.closed)
 	for _, ch := range c.awaiting {
 		close(ch)
@@ -291,6 +280,7 @@ func (c *rawConnection) closer() {
 	c.awaiting = nil
 	c.writer.Close()
 	c.reader.Close()
+	c.Unlock()
 
 	c.receiver.Close(c.id, err)
 }
@@ -309,12 +299,12 @@ loop:
 	for !c.isClosed() {
 		var hdr header
 		hdr.decodeXDR(c.xr)
-		if err := c.xr.Error(); err != nil {
-			c.close <- err
+		if c.xr.Error() != nil {
+			c.close(c.xr.Error())
 			break loop
 		}
 		if hdr.version != 0 {
-			c.close <- fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)
+			c.close(fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version))
 			break loop
 		}
 
@@ -322,8 +312,8 @@ loop:
 		case messageTypeIndex:
 			var im IndexMessage
 			im.decodeXDR(c.xr)
-			if err := c.xr.Error(); err != nil {
-				c.close <- err
+			if c.xr.Error() != nil {
+				c.close(c.xr.Error())
 				break loop
 			} else {
 
@@ -336,12 +326,15 @@ loop:
 
 				go c.receiver.Index(c.id, im.Repository, im.Files)
 			}
+			c.Lock()
+			c.hasRecvdIndex = true
+			c.Unlock()
 
 		case messageTypeIndexUpdate:
 			var im IndexMessage
 			im.decodeXDR(c.xr)
-			if err := c.xr.Error(); err != nil {
-				c.close <- err
+			if c.xr.Error() != nil {
+				c.close(c.xr.Error())
 				break loop
 			} else {
 				go c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
@@ -350,8 +343,8 @@ loop:
 		case messageTypeRequest:
 			var req RequestMessage
 			req.decodeXDR(c.xr)
-			if err := c.xr.Error(); err != nil {
-				c.close <- err
+			if c.xr.Error() != nil {
+				c.close(c.xr.Error())
 				break loop
 			}
 			go c.processRequest(hdr.msgID, req)
@@ -359,16 +352,16 @@ loop:
 		case messageTypeResponse:
 			data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
 
-			if err := c.xr.Error(); err != nil {
-				c.close <- err
+			if c.xr.Error() != nil {
+				c.close(c.xr.Error())
 				break loop
 			}
 
 			go func(hdr header, err error) {
-				c.imut.Lock()
+				c.Lock()
 				rc, ok := c.awaiting[hdr.msgID]
 				delete(c.awaiting, hdr.msgID)
-				c.imut.Unlock()
+				c.Unlock()
 
 				if ok {
 					rc <- asyncResult{data, err}
@@ -377,41 +370,44 @@ loop:
 			}(hdr, c.xr.Error())
 
 		case messageTypePing:
-			c.wmut.Lock()
+			c.Lock()
 			header{0, hdr.msgID, messageTypePong}.encodeXDR(c.xw)
 			err := c.flush()
-			c.wmut.Unlock()
+			c.Unlock()
 			if err != nil {
-				c.close <- err
+				c.close(err)
+				break loop
+			} else if c.xw.Error() != nil {
+				c.close(c.xw.Error())
 				break loop
 			}
 
 		case messageTypePong:
-			c.imut.Lock()
+			c.RLock()
 			rc, ok := c.awaiting[hdr.msgID]
+			c.RUnlock()
 
 			if ok {
-				go func() {
-					rc <- asyncResult{}
-					close(rc)
-				}()
+				rc <- asyncResult{}
+				close(rc)
 
+				c.Lock()
 				delete(c.awaiting, hdr.msgID)
+				c.Unlock()
 			}
-			c.imut.Unlock()
 
 		case messageTypeClusterConfig:
 			var cm ClusterConfigMessage
 			cm.decodeXDR(c.xr)
-			if err := c.xr.Error(); err != nil {
-				c.close <- err
+			if c.xr.Error() != nil {
+				c.close(c.xr.Error())
 				break loop
 			} else {
 				go c.receiver.ClusterConfig(c.id, cm)
 			}
 
 		default:
-			c.close <- fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
+			c.close(fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType))
 			break loop
 		}
 	}
@@ -420,16 +416,17 @@ loop:
 func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
 	data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
 
-	c.wmut.Lock()
+	c.Lock()
 	header{0, msgID, messageTypeResponse}.encodeXDR(c.xw)
-	c.xw.WriteBytes(data)
-	err := c.flush()
-	c.wmut.Unlock()
+	_, err := c.xw.WriteBytes(data)
+	if err == nil {
+		err = c.flush()
+	}
+	c.Unlock()
 
 	buffers.Put(data)
-
 	if err != nil {
-		c.close <- err
+		c.close(err)
 	}
 }
 
@@ -439,16 +436,22 @@ func (c *rawConnection) pingerLoop() {
 	for {
 		select {
 		case <-ticker:
-			go func() {
-				rc <- c.ping()
-			}()
-			select {
-			case ok := <-rc:
-				if !ok {
-					c.close <- fmt.Errorf("ping failure")
+			c.RLock()
+			ready := c.hasRecvdIndex && c.hasSentIndex
+			c.RUnlock()
+
+			if ready {
+				go func() {
+					rc <- c.ping()
+				}()
+				select {
+				case ok := <-rc:
+					if !ok {
+						c.close(fmt.Errorf("ping failure"))
+					}
+				case <-time.After(pingTimeout):
+					c.close(fmt.Errorf("ping timeout"))
 				}
-			case <-time.After(pingTimeout):
-				c.close <- fmt.Errorf("ping timeout")
 			}
 		case <-c.closed:
 			return