Browse Source

Merge branch 'v0.8'

* v0.8:
  Don't leak writer and index goroutines on close
  Clean up protocol locking and closing
  Send initial index in batches
  Always send initial index, even if empty (ref #344)
  Simplify locking in protocol.Index
  Protocol state machine on receiving side
  Log client version on connect
  Handle query parameters in UPnP control URL (fixes #211)
  Avoid deadlock during initial scan (fixes #389)
  Add temporary debug logging for #344 (revert later)
  Tone down UPnP not found message (fixes #406)
Jakob Borg 11 years ago
parent
commit
3c4002e149
6 changed files with 193 additions and 102 deletions
  1. 4 1
      cmd/syncthing/main.go
  2. 5 0
      files/set.go
  3. 51 0
      files/set_debug.go
  4. 22 15
      model/model.go
  5. 95 82
      protocol/protocol.go
  6. 16 4
      upnp/upnp.go

+ 4 - 1
cmd/syncthing/main.go

@@ -483,7 +483,10 @@ func setupUPnP(r rand.Source) int {
 					l.Warnln("Failed to create UPnP port mapping")
 				}
 			} else {
-				l.Infof("No UPnP IGD device found, no port mapping created (%v)", err)
+				l.Infof("No UPnP gateway detected")
+				if debugNet {
+					l.Debugf("UPnP: %v", err)
+				}
 			}
 		}
 	} else {

+ 5 - 0
files/set.go

@@ -49,6 +49,7 @@ func (m *Set) Replace(id uint, fs []scanner.File) {
 	}
 
 	m.Lock()
+	log("Replace", id, len(fs))
 	if len(fs) == 0 || !m.equals(id, fs) {
 		m.changes[id]++
 		m.replace(id, fs)
@@ -65,6 +66,7 @@ func (m *Set) ReplaceWithDelete(id uint, fs []scanner.File) {
 	}
 
 	m.Lock()
+	log("ReplaceWithDelete", id, len(fs))
 	if len(fs) == 0 || !m.equals(id, fs) {
 		m.changes[id]++
 
@@ -102,7 +104,9 @@ func (m *Set) Update(id uint, fs []scanner.File) {
 	if debug {
 		l.Debugf("Update(%d, [%d])", id, len(fs))
 	}
+
 	m.Lock()
+	log("Update", id, len(fs))
 	m.update(id, fs)
 	m.changes[id]++
 	m.Unlock()
@@ -220,6 +224,7 @@ func (m *Set) equals(id uint, fs []scanner.File) bool {
 func (m *Set) update(cid uint, fs []scanner.File) {
 	remFiles := m.remoteKey[cid]
 	if remFiles == nil {
+		printLog()
 		l.Fatalln("update before replace for cid", cid)
 	}
 	for _, f := range fs {

+ 51 - 0
files/set_debug.go

@@ -0,0 +1,51 @@
+package files
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/calmh/syncthing/cid"
+)
+
+type logEntry struct {
+	time   time.Time
+	method string
+	cid    uint
+	node   string
+	nfiles int
+}
+
+func (l logEntry) String() string {
+	return fmt.Sprintf("%v: %s cid:%d node:%s nfiles:%d", l.time, l.method, l.cid, l.node, l.nfiles)
+}
+
+var (
+	debugLog  [10]logEntry
+	debugNext int
+	cm        *cid.Map
+)
+
+func SetCM(m *cid.Map) {
+	cm = m
+}
+
+func log(method string, id uint, nfiles int) {
+	e := logEntry{
+		time:   time.Now(),
+		method: method,
+		cid:    id,
+		nfiles: nfiles,
+	}
+	if cm != nil {
+		e.node = cm.Name(id)
+	}
+	debugLog[debugNext] = e
+	debugNext = (debugNext + 1) % len(debugLog)
+}
+
+func printLog() {
+	l.Debugln("--- Consistency error ---")
+	for _, e := range debugLog {
+		l.Debugln(e)
+	}
+}

+ 22 - 15
model/model.go

@@ -98,6 +98,9 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers
 		sup:           suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
 	}
 
+	// TEMP: #344
+	files.SetCM(m.cm)
+
 	var timeout = 20 * 60 // seconds
 	if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
 		it, err := strconv.Atoi(t)
@@ -369,6 +372,8 @@ func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterCon
 		m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
 	}
 	m.pmut.Unlock()
+
+	l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
 }
 
 // Close removes the peer from the model and closes the underlying connection if possible.
@@ -451,19 +456,6 @@ func (m *Model) ReplaceLocal(repo string, fs []scanner.File) {
 	m.rmut.RUnlock()
 }
 
-func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) {
-	var sfs = make([]scanner.File, len(fs))
-	for i := 0; i < len(fs); i++ {
-		lamport.Default.Tick(fs[i].Version)
-		sfs[i] = fileFromFileInfo(fs[i])
-		sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
-	}
-
-	m.rmut.RLock()
-	m.repoFiles[repo].Replace(cid.LocalID, sfs)
-	m.rmut.RUnlock()
-}
-
 func (m *Model) CurrentRepoFile(repo string, file string) scanner.File {
 	m.rmut.RLock()
 	f := m.repoFiles[repo].Get(cid.LocalID, file)
@@ -528,7 +520,14 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)
 			if debug {
 				l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
 			}
-			protoConn.Index(repo, idx)
+			const batchSize = 1000
+			for i := 0; i < len(idx); i += batchSize {
+				if len(idx[i:]) < batchSize {
+					protoConn.Index(repo, idx[i:])
+				} else {
+					protoConn.Index(repo, idx[i:i+batchSize])
+				}
+			}
 		}
 	}()
 }
@@ -733,7 +732,15 @@ func (m *Model) LoadIndexes(dir string) {
 	m.rmut.RLock()
 	for repo := range m.repoCfgs {
 		fs := m.loadIndex(repo, dir)
-		m.SeedLocal(repo, fs)
+
+		var sfs = make([]scanner.File, len(fs))
+		for i := 0; i < len(fs); i++ {
+			lamport.Default.Tick(fs[i].Version)
+			sfs[i] = fileFromFileInfo(fs[i])
+			sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
+		}
+
+		m.repoFiles[repo].Replace(cid.LocalID, sfs)
 	}
 	m.rmut.RUnlock()
 }

+ 95 - 82
protocol/protocol.go

@@ -28,6 +28,12 @@ const (
 	messageTypeIndexUpdate   = 6
 )
 
+const (
+	stateInitial = iota
+	stateCCRcvd
+	stateIdxRcvd
+)
+
 const (
 	FlagDeleted    uint32 = 1 << 12
 	FlagInvalid           = 1 << 13
@@ -70,26 +76,27 @@ type Connection interface {
 type rawConnection struct {
 	id       NodeID
 	receiver Model
+	state    int
 
 	reader io.ReadCloser
 	cr     *countingReader
 	xr     *xdr.Reader
-	writer io.WriteCloser
 
-	cw   *countingWriter
-	wb   *bufio.Writer
-	xw   *xdr.Writer
-	wmut sync.Mutex
+	writer io.WriteCloser
+	cw     *countingWriter
+	wb     *bufio.Writer
+	xw     *xdr.Writer
 
-	indexSent map[string]map[string]uint64
-	awaiting  []chan asyncResult
-	imut      sync.Mutex
+	awaiting    []chan asyncResult
+	awaitingMut sync.Mutex
 
-	idxMut sync.Mutex // ensures serialization of Index calls
+	idxSent map[string]map[string]uint64
+	idxMut  sync.Mutex // ensures serialization of Index calls
 
 	nextID chan int
 	outbox chan []encodable
 	closed chan struct{}
+	once   sync.Once
 }
 
 type asyncResult struct {
@@ -114,20 +121,21 @@ func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver M
 	wb := bufio.NewWriter(flwr)
 
 	c := rawConnection{
-		id:        nodeID,
-		receiver:  nativeModel{receiver},
-		reader:    flrd,
-		cr:        cr,
-		xr:        xdr.NewReader(flrd),
-		writer:    flwr,
-		cw:        cw,
-		wb:        wb,
-		xw:        xdr.NewWriter(wb),
-		awaiting:  make([]chan asyncResult, 0x1000),
-		indexSent: make(map[string]map[string]uint64),
-		outbox:    make(chan []encodable),
-		nextID:    make(chan int),
-		closed:    make(chan struct{}),
+		id:       nodeID,
+		receiver: nativeModel{receiver},
+		state:    stateInitial,
+		reader:   flrd,
+		cr:       cr,
+		xr:       xdr.NewReader(flrd),
+		writer:   flwr,
+		cw:       cw,
+		wb:       wb,
+		xw:       xdr.NewWriter(wb),
+		awaiting: make([]chan asyncResult, 0x1000),
+		idxSent:  make(map[string]map[string]uint64),
+		outbox:   make(chan []encodable),
+		nextID:   make(chan int),
+		closed:   make(chan struct{}),
 	}
 
 	go c.indexSerializerLoop()
@@ -148,31 +156,29 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) {
 	c.idxMut.Lock()
 	defer c.idxMut.Unlock()
 
-	c.imut.Lock()
 	var msgType int
-	if c.indexSent[repo] == nil {
+	if c.idxSent[repo] == nil {
 		// This is the first time we send an index.
 		msgType = messageTypeIndex
 
-		c.indexSent[repo] = make(map[string]uint64)
+		c.idxSent[repo] = make(map[string]uint64)
 		for _, f := range idx {
-			c.indexSent[repo][f.Name] = f.Version
+			c.idxSent[repo][f.Name] = f.Version
 		}
 	} else {
 		// We have sent one full index. Only send updates now.
 		msgType = messageTypeIndexUpdate
 		var diff []FileInfo
 		for _, f := range idx {
-			if vs, ok := c.indexSent[repo][f.Name]; !ok || f.Version != vs {
+			if vs, ok := c.idxSent[repo][f.Name]; !ok || f.Version != vs {
 				diff = append(diff, f)
-				c.indexSent[repo][f.Name] = f.Version
+				c.idxSent[repo][f.Name] = f.Version
 			}
 		}
 		idx = diff
 	}
-	c.imut.Unlock()
 
-	if len(idx) > 0 {
+	if msgType == messageTypeIndex || len(idx) > 0 {
 		c.send(header{0, -1, msgType}, IndexMessage{repo, idx})
 	}
 }
@@ -186,13 +192,13 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
 		return nil, ErrClosed
 	}
 
-	c.imut.Lock()
+	c.awaitingMut.Lock()
 	if ch := c.awaiting[id]; ch != nil {
 		panic("id taken")
 	}
-	rc := make(chan asyncResult)
+	rc := make(chan asyncResult, 1)
 	c.awaiting[id] = rc
-	c.imut.Unlock()
+	c.awaitingMut.Unlock()
 
 	ok := c.send(header{0, id, messageTypeRequest},
 		RequestMessage{repo, name, uint64(offset), uint32(size)})
@@ -221,9 +227,9 @@ func (c *rawConnection) ping() bool {
 	}
 
 	rc := make(chan asyncResult, 1)
-	c.imut.Lock()
+	c.awaitingMut.Lock()
 	c.awaiting[id] = rc
-	c.imut.Unlock()
+	c.awaitingMut.Unlock()
 
 	ok := c.send(header{0, id, messageTypePing})
 	if !ok {
@@ -257,21 +263,34 @@ func (c *rawConnection) readerLoop() (err error) {
 
 		switch hdr.msgType {
 		case messageTypeIndex:
+			if c.state < stateCCRcvd {
+				return fmt.Errorf("protocol error: index message in state %d", c.state)
+			}
 			if err := c.handleIndex(); err != nil {
 				return err
 			}
+			c.state = stateIdxRcvd
 
 		case messageTypeIndexUpdate:
+			if c.state < stateIdxRcvd {
+				return fmt.Errorf("protocol error: index update message in state %d", c.state)
+			}
 			if err := c.handleIndexUpdate(); err != nil {
 				return err
 			}
 
 		case messageTypeRequest:
+			if c.state < stateIdxRcvd {
+				return fmt.Errorf("protocol error: request message in state %d", c.state)
+			}
 			if err := c.handleRequest(hdr); err != nil {
 				return err
 			}
 
 		case messageTypeResponse:
+			if c.state < stateIdxRcvd {
+				return fmt.Errorf("protocol error: response message in state %d", c.state)
+			}
 			if err := c.handleResponse(hdr); err != nil {
 				return err
 			}
@@ -283,9 +302,13 @@ func (c *rawConnection) readerLoop() (err error) {
 			c.handlePong(hdr)
 
 		case messageTypeClusterConfig:
+			if c.state != stateInitial {
+				return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
+			}
 			if err := c.handleClusterConfig(); err != nil {
 				return err
 			}
+			c.state = stateCCRcvd
 
 		default:
 			return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
@@ -309,11 +332,16 @@ func (c *rawConnection) indexSerializerLoop() {
 	// large index update from the other side. But we must also ensure to
 	// process the indexes in the order they are received, hence the separate
 	// routine and buffered channel.
-	for ii := range incomingIndexes {
-		if ii.update {
-			c.receiver.IndexUpdate(ii.id, ii.repo, ii.files)
-		} else {
-			c.receiver.Index(ii.id, ii.repo, ii.files)
+	for {
+		select {
+		case ii := <-incomingIndexes:
+			if ii.update {
+				c.receiver.IndexUpdate(ii.id, ii.repo, ii.files)
+			} else {
+				c.receiver.Index(ii.id, ii.repo, ii.files)
+			}
+		case <-c.closed:
+			return
 		}
 	}
 }
@@ -365,32 +393,25 @@ func (c *rawConnection) handleResponse(hdr header) error {
 		return err
 	}
 
-	go func(hdr header, err error) {
-		c.imut.Lock()
-		rc := c.awaiting[hdr.msgID]
+	c.awaitingMut.Lock()
+	if rc := c.awaiting[hdr.msgID]; rc != nil {
 		c.awaiting[hdr.msgID] = nil
-		c.imut.Unlock()
-
-		if rc != nil {
-			rc <- asyncResult{data, err}
-			close(rc)
-		}
-	}(hdr, c.xr.Error())
+		rc <- asyncResult{data, nil}
+		close(rc)
+	}
+	c.awaitingMut.Unlock()
 
 	return nil
 }
 
 func (c *rawConnection) handlePong(hdr header) {
-	c.imut.Lock()
+	c.awaitingMut.Lock()
 	if rc := c.awaiting[hdr.msgID]; rc != nil {
-		go func() {
-			rc <- asyncResult{}
-			close(rc)
-		}()
-
 		c.awaiting[hdr.msgID] = nil
+		rc <- asyncResult{}
+		close(rc)
 	}
-	c.imut.Unlock()
+	c.awaitingMut.Unlock()
 }
 
 func (c *rawConnection) handleClusterConfig() error {
@@ -434,18 +455,20 @@ func (c *rawConnection) send(h header, es ...encodable) bool {
 
 func (c *rawConnection) writerLoop() {
 	var err error
-	for es := range c.outbox {
-		c.wmut.Lock()
-		for _, e := range es {
-			e.encodeXDR(c.xw)
-		}
+	for {
+		select {
+		case es := <-c.outbox:
+			for _, e := range es {
+				e.encodeXDR(c.xw)
+			}
 
-		if err = c.flush(); err != nil {
-			c.wmut.Unlock()
-			c.close(err)
+			if err = c.flush(); err != nil {
+				c.close(err)
+				return
+			}
+		case <-c.closed:
 			return
 		}
-		c.wmut.Unlock()
 	}
 }
 
@@ -470,29 +493,20 @@ func (c *rawConnection) flush() error {
 }
 
 func (c *rawConnection) close(err error) {
-	c.imut.Lock()
-	c.wmut.Lock()
-	defer c.imut.Unlock()
-	defer c.wmut.Unlock()
-
-	select {
-	case <-c.closed:
-		return
-	default:
+	c.once.Do(func() {
 		close(c.closed)
 
+		c.awaitingMut.Lock()
 		for i, ch := range c.awaiting {
 			if ch != nil {
 				close(ch)
 				c.awaiting[i] = nil
 			}
 		}
-
-		c.writer.Close()
-		c.reader.Close()
+		c.awaitingMut.Unlock()
 
 		go c.receiver.Close(c.id, err)
-	}
+	})
 }
 
 func (c *rawConnection) idGenerator() {
@@ -554,8 +568,7 @@ func (c *rawConnection) pingerLoop() {
 func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
 	data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
 
-	c.send(header{0, msgID, messageTypeResponse},
-		encodableBytes(data))
+	c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data))
 }
 
 type Statistics struct {

+ 16 - 4
upnp/upnp.go

@@ -203,12 +203,24 @@ func getServiceURL(rootURL string) (string, error) {
 	}
 
 	u, _ := url.Parse(rootURL)
-	if svc.ControlURL[0] == '/' {
-		u.Path = svc.ControlURL
+	replaceRawPath(u, svc.ControlURL)
+	return u.String(), nil
+}
+
+func replaceRawPath(u *url.URL, rp string) {
+	var p, q string
+	fs := strings.Split(rp, "?")
+	p = fs[0]
+	if len(fs) > 1 {
+		q = fs[1]
+	}
+
+	if p[0] == '/' {
+		u.Path = p
 	} else {
-		u.Path += svc.ControlURL
+		u.Path += p
 	}
-	return u.String(), nil
+	u.RawQuery = q
 }
 
 func soapRequest(url, function, message string) error {