Browse Source

Fix handling of changed/deleted directories (fixes #231)

Jakob Borg 11 years ago
parent
commit
dba40eefb1
8 changed files with 122 additions and 44 deletions
  1. 2 2
      cmd/stpidx/main.go
  2. 11 0
      cmd/syncthing/gui.go
  3. 2 6
      files/set.go
  4. 1 0
      integration/.gitignore
  5. 7 4
      integration/folders.sh
  6. 13 6
      model/blockqueue.go
  7. 22 6
      model/model.go
  8. 64 20
      model/puller.go

+ 2 - 2
cmd/stpidx/main.go

@@ -41,8 +41,8 @@ func main() {
 		inv := file.Flags&protocol.FlagInvalid != 0
 		dir := file.Flags&protocol.FlagDirectory != 0
 		prm := file.Flags & 0777
-		log.Printf("File: %q, Del: %v, Inv: %v, Dir: %v, Perm: 0%03o, Modified: %d, Blocks: %d",
-			file.Name, del, inv, dir, prm, file.Modified, len(file.Blocks))
+		log.Printf("File: %q, Ver:%d, Del: %v, Inv: %v, Dir: %v, Perm: 0%03o, Modified: %d, Blocks: %d",
+			file.Name, file.Version, del, inv, dir, prm, file.Modified, len(file.Blocks))
 		if *showBlocks {
 			for _, block := range file.Blocks {
 				log.Printf("   Size: %6d, Hash: %x", block.Size, block.Hash)

+ 11 - 0
cmd/syncthing/gui.go

@@ -51,6 +51,7 @@ func startGUI(cfg config.GUIConfiguration, m *model.Model) error {
 	router.Get("/", getRoot)
 	router.Get("/rest/version", restGetVersion)
 	router.Get("/rest/model", restGetModel)
+	router.Get("/rest/need", restGetNeed)
 	router.Get("/rest/connections", restGetConnections)
 	router.Get("/rest/config", restGetConfig)
 	router.Get("/rest/config/sync", restGetConfigInSync)
@@ -125,6 +126,16 @@ func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) {
 	json.NewEncoder(w).Encode(res)
 }
 
+func restGetNeed(m *model.Model, w http.ResponseWriter, r *http.Request) {
+	var qs = r.URL.Query()
+	var repo = qs.Get("repo")
+
+	files := m.NeedFilesRepo(repo)
+
+	w.Header().Set("Content-Type", "application/json")
+	json.NewEncoder(w).Encode(files)
+}
+
 func restGetConnections(m *model.Model, w http.ResponseWriter) {
 	var res = m.ConnectionStats()
 	w.Header().Set("Content-Type", "application/json")

+ 2 - 6
files/set.go

@@ -116,12 +116,8 @@ func (m *Set) Need(id uint) []scanner.File {
 			continue
 		}
 
-		file := gf.File
-		switch {
-		case file.Flags&protocol.FlagDirectory == 0 && gk.newerThan(rkID[gk.Name]):
-			fs = append(fs, file)
-		case file.Flags&(protocol.FlagDirectory|protocol.FlagDeleted) == protocol.FlagDirectory && gk.newerThan(rkID[gk.Name]):
-			fs = append(fs, file)
+		if gk.newerThan(rkID[gk.Name]) {
+			fs = append(fs, gf.File)
 		}
 	}
 	m.Unlock()

+ 1 - 0
integration/.gitignore

@@ -11,3 +11,4 @@ md5r
 json
 *.idx.gz
 dirs-*
+*.out

+ 7 - 4
integration/folders.sh

@@ -4,15 +4,13 @@ iterations=${1:-5}
 
 id1=I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA
 id2=JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ
-id3=373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA
 
 go build json.go
 
 start() {
 	echo "Starting..."
-	for i in 1 2 ; do
-		STTRACE=linenumbers STPROFILER=":909$i" syncthing -home "f$i" &
-	done
+	STTRACE=model,scanner STPROFILER=":9091" syncthing -home "f1" > 1.out 2>&1 &
+	STTRACE=model,scanner STPROFILER=":9092" syncthing -home "f2" > 2.out 2>&1 &
 }
 
 stop() {
@@ -44,6 +42,11 @@ testConvergence() {
 		tot=$(($s1comp + $s2comp))
 		echo $tot / 200
 		if [[ $tot == 200 ]] ; then
+			# when fixing up directories, a node will announce completion
+			# slightly before it's actually complete. this is arguably a bug,
+			# but we let it slide for the moment as long as it gets there
+			# eventually.
+			sleep 5
 			break
 		fi
 	done

+ 13 - 6
model/blockqueue.go

@@ -1,7 +1,7 @@
 package model
 
 import (
-	"sync/atomic"
+	"sync"
 
 	"github.com/calmh/syncthing/scanner"
 )
@@ -24,7 +24,8 @@ type blockQueue struct {
 	outbox chan bqBlock
 
 	queued []bqBlock
-	qlen   uint32
+
+	mut sync.Mutex
 }
 
 func newBlockQueue() *blockQueue {
@@ -37,6 +38,9 @@ func newBlockQueue() *blockQueue {
 }
 
 func (q *blockQueue) addBlock(a bqAdd) {
+	q.mut.Lock()
+	defer q.mut.Unlock()
+
 	// If we already have it queued, return
 	for _, b := range q.queued {
 		if b.file.Name == a.file.Name {
@@ -74,15 +78,18 @@ func (q *blockQueue) run() {
 		if len(q.queued) == 0 {
 			q.addBlock(<-q.inbox)
 		} else {
+			q.mut.Lock()
 			next := q.queued[0]
+			q.mut.Unlock()
 			select {
 			case a := <-q.inbox:
 				q.addBlock(a)
 			case q.outbox <- next:
+				q.mut.Lock()
 				q.queued = q.queued[1:]
+				q.mut.Unlock()
 			}
 		}
-		atomic.StoreUint32(&q.qlen, uint32(len(q.queued)))
 	}
 }
 
@@ -95,7 +102,7 @@ func (q *blockQueue) get() bqBlock {
 }
 
 func (q *blockQueue) empty() bool {
-	var l uint32
-	atomic.LoadUint32(&l)
-	return l == 0
+	q.mut.Lock()
+	defer q.mut.Unlock()
+	return len(q.queued) == 0
 }

+ 22 - 6
model/model.go

@@ -221,7 +221,7 @@ func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
 	return 0, 0, 0
 }
 
-// NeedFiles returns the list of currently needed files and the total size.
+// NeedSize returns the number and total size of currently needed files.
 func (m *Model) NeedSize(repo string) (files int, bytes int64) {
 	f, d, b := sizeOf(m.NeedFilesRepo(repo))
 	return f + d, b
@@ -241,13 +241,21 @@ func (m *Model) NeedFilesRepo(repo string) []scanner.File {
 // Implements the protocol.Model interface.
 func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) {
 	if debug {
-		l.Debugf("IDX(in): %s / %q: %d files", nodeID, repo, len(fs))
+		l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
 	}
 
 	var files = make([]scanner.File, len(fs))
 	for i := range fs {
-		lamport.Default.Tick(fs[i].Version)
-		files[i] = fileFromFileInfo(fs[i])
+		f := fs[i]
+		lamport.Default.Tick(f.Version)
+		if debug {
+			var flagComment string
+			if f.Flags&protocol.FlagDeleted != 0 {
+				flagComment = " (deleted)"
+			}
+			l.Debugf("IDX(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
+		}
+		files[i] = fileFromFileInfo(f)
 	}
 
 	id := m.cm.Get(nodeID)
@@ -269,8 +277,16 @@ func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo)
 
 	var files = make([]scanner.File, len(fs))
 	for i := range fs {
-		lamport.Default.Tick(fs[i].Version)
-		files[i] = fileFromFileInfo(fs[i])
+		f := fs[i]
+		lamport.Default.Tick(f.Version)
+		if debug {
+			var flagComment string
+			if f.Flags&protocol.FlagDeleted != 0 {
+				flagComment = " (deleted)"
+			}
+			l.Debugf("IDXUP(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
+		}
+		files[i] = fileFromFileInfo(f)
 	}
 
 	id := m.cm.Get(nodeID)

+ 64 - 20
model/puller.go

@@ -207,7 +207,9 @@ func (p *puller) runRO() {
 
 func (p *puller) fixupDirectories() {
 	var deleteDirs []string
-	filepath.Walk(p.dir, func(path string, info os.FileInfo, err error) error {
+	var changed = 0
+
+	var walkFn = func(path string, info os.FileInfo, err error) error {
 		if !info.IsDir() {
 			return nil
 		}
@@ -221,9 +223,12 @@ func (p *puller) fixupDirectories() {
 			return nil
 		}
 
-		cur := p.model.CurrentGlobalFile(p.repo, rn)
+		cur := p.model.CurrentRepoFile(p.repo, rn)
 		if cur.Name != rn {
 			// No matching dir in current list; weird
+			if debug {
+				l.Debugf("missing dir: %s; %v", rn, cur)
+			}
 			return nil
 		}
 
@@ -241,31 +246,59 @@ func (p *puller) fixupDirectories() {
 		}
 
 		if cur.Flags&uint32(os.ModePerm) != uint32(info.Mode()&os.ModePerm) {
-			os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
-			if debug {
-				l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
+			err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
+			if err != nil {
+				l.Warnln("Restoring folder flags: %q: %v", path, err)
+			} else {
+				changed++
+				if debug {
+					l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
+				}
 			}
 		}
 
 		if cur.Modified != info.ModTime().Unix() {
 			t := time.Unix(cur.Modified, 0)
-			os.Chtimes(path, t, t)
-			if debug {
-				l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
+			err := os.Chtimes(path, t, t)
+			if err != nil {
+				l.Warnln("Restoring folder modtime: %q: %v", path, err)
+			} else {
+				changed++
+				if debug {
+					l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
+				}
 			}
 		}
 
 		return nil
-	})
+	}
+
+	for {
+		deleteDirs = nil
+		changed = 0
+		filepath.Walk(p.dir, walkFn)
+
+		var deleted = 0
+		// Delete any queued directories
+		for i := len(deleteDirs) - 1; i >= 0; i-- {
+			dir := deleteDirs[i]
+			if debug {
+				l.Debugln("delete dir:", dir)
+			}
+			err := os.Remove(dir)
+			if err != nil {
+				l.Warnln(err)
+			} else {
+				deleted++
+			}
+		}
 
-	// Delete any queued directories
-	for i := len(deleteDirs) - 1; i >= 0; i-- {
 		if debug {
-			l.Debugln("delete dir:", deleteDirs[i])
+			l.Debugf("changed %d, deleted %d dirs", changed, deleted)
 		}
-		err := os.Remove(deleteDirs[i])
-		if err != nil {
-			l.Warnln(err)
+
+		if changed+deleted == 0 {
+			return
 		}
 	}
 }
@@ -301,12 +334,23 @@ func (p *puller) handleRequestResult(res requestResult) {
 func (p *puller) handleBlock(b bqBlock) bool {
 	f := b.file
 
-	// For directories, simply making sure they exist is enough
+	// For directories, making sure they exist is enough.
+	// Deleted directories we mark as handled and delete later.
 	if f.Flags&protocol.FlagDirectory != 0 {
-		path := filepath.Join(p.dir, f.Name)
-		_, err := os.Stat(path)
-		if err != nil && os.IsNotExist(err) {
-			os.MkdirAll(path, 0777)
+		if f.Flags&protocol.FlagDeleted == 0 {
+			path := filepath.Join(p.dir, f.Name)
+			_, err := os.Stat(path)
+			if err != nil && os.IsNotExist(err) {
+				if debug {
+					l.Debugf("create dir: %v", f)
+				}
+				err = os.MkdirAll(path, 0777)
+				if err != nil {
+					l.Warnf("Create folder: %q: %v", path, err)
+				}
+			}
+		} else if debug {
+			l.Debugf("ignore delete dir: %v", f)
 		}
 		p.model.updateLocal(p.repo, f)
 		return true