Browse Source

Merge pull request #2468 from calmh/removefolder

Remove folder without restart (fixes #2262)
Audrius Butkevicius 10 years ago
parent
commit
8fb7f40a6a
3 changed files with 160 additions and 64 deletions
  1. 101 44
      lib/model/model.go
  2. 28 14
      lib/scanner/blockqueue.go
  3. 31 6
      lib/scanner/walk.go

+ 101 - 44
lib/model/model.go

@@ -75,15 +75,16 @@ type Model struct {
 	clientName    string
 	clientVersion string
 
-	folderCfgs     map[string]config.FolderConfiguration                  // folder -> cfg
-	folderFiles    map[string]*db.FileSet                                 // folder -> files
-	folderDevices  map[string][]protocol.DeviceID                         // folder -> deviceIDs
-	deviceFolders  map[protocol.DeviceID][]string                         // deviceID -> folders
-	deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef
-	folderIgnores  map[string]*ignore.Matcher                             // folder -> matcher object
-	folderRunners  map[string]service                                     // folder -> puller or scanner
-	folderStatRefs map[string]*stats.FolderStatisticsReference            // folder -> statsRef
-	fmut           sync.RWMutex                                           // protects the above
+	folderCfgs         map[string]config.FolderConfiguration                  // folder -> cfg
+	folderFiles        map[string]*db.FileSet                                 // folder -> files
+	folderDevices      map[string][]protocol.DeviceID                         // folder -> deviceIDs
+	deviceFolders      map[protocol.DeviceID][]string                         // deviceID -> folders
+	deviceStatRefs     map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef
+	folderIgnores      map[string]*ignore.Matcher                             // folder -> matcher object
+	folderRunners      map[string]service                                     // folder -> puller or scanner
+	folderRunnerTokens map[string][]suture.ServiceToken                       // folder -> tokens for puller or scanner
+	folderStatRefs     map[string]*stats.FolderStatisticsReference            // folder -> statsRef
+	fmut               sync.RWMutex                                           // protects the above
 
 	conn         map[protocol.DeviceID]Connection
 	deviceVer    map[protocol.DeviceID]string
@@ -105,28 +106,29 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName,
 				l.Debugln(line)
 			},
 		}),
-		cfg:               cfg,
-		db:                ldb,
-		finder:            db.NewBlockFinder(ldb),
-		progressEmitter:   NewProgressEmitter(cfg),
-		id:                id,
-		shortID:           id.Short(),
-		cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles,
-		protectedFiles:    protectedFiles,
-		deviceName:        deviceName,
-		clientName:        clientName,
-		clientVersion:     clientVersion,
-		folderCfgs:        make(map[string]config.FolderConfiguration),
-		folderFiles:       make(map[string]*db.FileSet),
-		folderDevices:     make(map[string][]protocol.DeviceID),
-		deviceFolders:     make(map[protocol.DeviceID][]string),
-		deviceStatRefs:    make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
-		folderIgnores:     make(map[string]*ignore.Matcher),
-		folderRunners:     make(map[string]service),
-		folderStatRefs:    make(map[string]*stats.FolderStatisticsReference),
-		conn:              make(map[protocol.DeviceID]Connection),
-		deviceVer:         make(map[protocol.DeviceID]string),
-		devicePaused:      make(map[protocol.DeviceID]bool),
+		cfg:                cfg,
+		db:                 ldb,
+		finder:             db.NewBlockFinder(ldb),
+		progressEmitter:    NewProgressEmitter(cfg),
+		id:                 id,
+		shortID:            id.Short(),
+		cacheIgnoredFiles:  cfg.Options().CacheIgnoredFiles,
+		protectedFiles:     protectedFiles,
+		deviceName:         deviceName,
+		clientName:         clientName,
+		clientVersion:      clientVersion,
+		folderCfgs:         make(map[string]config.FolderConfiguration),
+		folderFiles:        make(map[string]*db.FileSet),
+		folderDevices:      make(map[string][]protocol.DeviceID),
+		deviceFolders:      make(map[protocol.DeviceID][]string),
+		deviceStatRefs:     make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
+		folderIgnores:      make(map[string]*ignore.Matcher),
+		folderRunners:      make(map[string]service),
+		folderRunnerTokens: make(map[string][]suture.ServiceToken),
+		folderStatRefs:     make(map[string]*stats.FolderStatisticsReference),
+		conn:               make(map[protocol.DeviceID]Connection),
+		deviceVer:          make(map[protocol.DeviceID]string),
+		devicePaused:       make(map[protocol.DeviceID]bool),
 
 		fmut: sync.NewRWMutex(),
 		pmut: sync.NewRWMutex(),
@@ -163,7 +165,6 @@ func (m *Model) StartFolderRW(folder string) {
 	}
 	p := newRWFolder(m, m.shortID, cfg)
 	m.folderRunners[folder] = p
-	m.fmut.Unlock()
 
 	if len(cfg.Versioning.Type) > 0 {
 		factory, ok := versioner.Factories[cfg.Versioning.Type]
@@ -176,14 +177,17 @@ func (m *Model) StartFolderRW(folder string) {
 			// The versioner implements the suture.Service interface, so
 			// expects to be run in the background in addition to being called
 			// when files are going to be archived.
-			m.Add(service)
+			token := m.Add(service)
+			m.folderRunnerTokens[folder] = append(m.folderRunnerTokens[folder], token)
 		}
 		p.versioner = versioner
 	}
 
 	m.warnAboutOverwritingProtectedFiles(folder)
 
-	m.Add(p)
+	token := m.Add(p)
+	m.folderRunnerTokens[folder] = append(m.folderRunnerTokens[folder], token)
+	m.fmut.Unlock()
 
 	l.Okln("Ready to synchronize", folder, "(read-write)")
 }
@@ -232,13 +236,49 @@ func (m *Model) StartFolderRO(folder string) {
 	}
 	s := newROFolder(m, folder, time.Duration(cfg.RescanIntervalS)*time.Second)
 	m.folderRunners[folder] = s
-	m.fmut.Unlock()
 
-	m.Add(s)
+	token := m.Add(s)
+	m.folderRunnerTokens[folder] = append(m.folderRunnerTokens[folder], token)
+	m.fmut.Unlock()
 
 	l.Okln("Ready to synchronize", folder, "(read only; no external updates accepted)")
 }
 
+func (m *Model) RemoveFolder(folder string) {
+	m.fmut.Lock()
+	m.pmut.Lock()
+
+	// Stop the services running for this folder
+	for _, id := range m.folderRunnerTokens[folder] {
+		m.Remove(id)
+	}
+
+	// Close connections to affected devices
+	for _, dev := range m.folderDevices[folder] {
+		if conn, ok := m.conn[dev]; ok {
+			closeRawConn(conn)
+		}
+	}
+
+	// Clean up our config maps
+	delete(m.folderCfgs, folder)
+	delete(m.folderFiles, folder)
+	delete(m.folderDevices, folder)
+	delete(m.folderIgnores, folder)
+	delete(m.folderRunners, folder)
+	delete(m.folderRunnerTokens, folder)
+	delete(m.folderStatRefs, folder)
+	for dev, folders := range m.deviceFolders {
+		m.deviceFolders[dev] = stringSliceWithout(folders, folder)
+	}
+
+	// Remove it from the database
+	db.DropFolder(m.db, folder)
+
+	m.pmut.Unlock()
+	m.fmut.Unlock()
+}
+
 type ConnectionInfo struct {
 	protocol.Statistics
 	Connected     bool
@@ -1250,6 +1290,11 @@ nextSub:
 	}
 	subs = unifySubs
 
+	// The cancel channel is closed whenever we return (such as from an error),
+	// to signal the potentially still running walker to stop.
+	cancel := make(chan struct{})
+	defer close(cancel)
+
 	w := &scanner.Walker{
 		Folder:                folderCfg.ID,
 		Dir:                   folderCfg.Path(),
@@ -1265,6 +1310,7 @@ nextSub:
 		Hashers:               m.numHashers(folder),
 		ShortID:               m.shortID,
 		ProgressTickIntervalS: folderCfg.ScanProgressIntervalS,
+		Cancel:                cancel,
 	}
 
 	runner.setState(FolderScanning)
@@ -1674,17 +1720,17 @@ func (m *Model) BringToFront(folder, file string) {
 // CheckFolderHealth checks the folder for common errors and returns the
 // current folder error, or nil if the folder is healthy.
 func (m *Model) CheckFolderHealth(id string) error {
+	folder, ok := m.cfg.Folders()[id]
+	if !ok {
+		return errors.New("folder does not exist")
+	}
+
 	if minFree := m.cfg.Options().MinHomeDiskFreePct; minFree > 0 {
 		if free, err := osutil.DiskFreePercentage(m.cfg.ConfigPath()); err == nil && free < minFree {
 			return errors.New("home disk has insufficient free space")
 		}
 	}
 
-	folder, ok := m.cfg.Folders()[id]
-	if !ok {
-		return errors.New("folder does not exist")
-	}
-
 	fi, err := os.Stat(folder.Path())
 
 	v, ok := m.CurrentLocalVersion(id)
@@ -1797,9 +1843,9 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool {
 	for folderID, fromCfg := range fromFolders {
 		toCfg, ok := toFolders[folderID]
 		if !ok {
-			// A folder was removed. Requires restart.
-			l.Debugln(m, "requires restart, removing folder", folderID)
-			return false
+			// The folder was removed.
+			m.RemoveFolder(folderID)
+			continue
 		}
 
 		// This folder exists on both sides. Compare the device lists, as we
@@ -1961,3 +2007,14 @@ func closeRawConn(conn io.Closer) error {
 	}
 	return conn.Close()
 }
+
+func stringSliceWithout(ss []string, s string) []string {
+	for i := range ss {
+		if ss[i] == s {
+			copy(ss[i:], ss[i+1:])
+			ss = ss[:len(ss)-1]
+			return ss
+		}
+	}
+	return ss
+}

+ 28 - 14
lib/scanner/blockqueue.go

@@ -19,13 +19,13 @@ import (
 // workers are used in parallel. The outbox will become closed when the inbox
 // is closed and all items handled.
 
-func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo, counter *int64, done chan struct{}) {
+func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo, counter *int64, done, cancel chan struct{}) {
 	wg := sync.NewWaitGroup()
 	wg.Add(workers)
 
 	for i := 0; i < workers; i++ {
 		go func() {
-			hashFiles(dir, blockSize, outbox, inbox, counter)
+			hashFiles(dir, blockSize, outbox, inbox, counter, cancel)
 			wg.Done()
 		}()
 	}
@@ -59,19 +59,33 @@ func HashFile(path string, blockSize int, sizeHint int64, counter *int64) ([]pro
 	return Blocks(fd, blockSize, sizeHint, counter)
 }
 
-func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo, counter *int64) {
-	for f := range inbox {
-		if f.IsDirectory() || f.IsDeleted() {
-			panic("Bug. Asked to hash a directory or a deleted file.")
-		}
+func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo, counter *int64, cancel chan struct{}) {
+	for {
+		select {
+		case f, ok := <-inbox:
+			if !ok {
+				return
+			}
 
-		blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize, f.CachedSize, counter)
-		if err != nil {
-			l.Debugln("hash error:", f.Name, err)
-			continue
-		}
+			if f.IsDirectory() || f.IsDeleted() {
+				panic("Bug. Asked to hash a directory or a deleted file.")
+			}
 
-		f.Blocks = blocks
-		outbox <- f
+			blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize, f.CachedSize, counter)
+			if err != nil {
+				l.Debugln("hash error:", f.Name, err)
+				continue
+			}
+
+			f.Blocks = blocks
+			select {
+			case outbox <- f:
+			case <-cancel:
+				return
+			}
+
+		case <-cancel:
+			return
+		}
 	}
 }

+ 31 - 6
lib/scanner/walk.go

@@ -72,6 +72,8 @@ type Walker struct {
 	// Optional progress tick interval which defines how often FolderScanProgress
 	// events are emitted. Negative number means disabled.
 	ProgressTickIntervalS int
+	// Signals cancel from the outside - when closed, we should stop walking.
+	Cancel chan struct{}
 }
 
 type TempNamer interface {
@@ -121,7 +123,7 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) {
 	// We're not required to emit scan progress events, just kick off hashers,
 	// and feed inputs directly from the walker.
 	if w.ProgressTickIntervalS < 0 {
-		newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, toHashChan, nil, nil)
+		newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, toHashChan, nil, nil, w.Cancel)
 		return finishedChan, nil
 	}
 
@@ -149,7 +151,7 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) {
 
 		realToHashChan := make(chan protocol.FileInfo)
 		done := make(chan struct{})
-		newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, &progress, done)
+		newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, &progress, done, w.Cancel)
 
 		// A routine which actually emits the FolderScanProgress events
 		// every w.ProgressTicker ticks, until the hasher routines terminate.
@@ -168,13 +170,21 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) {
 						"current": current,
 						"total":   total,
 					})
+				case <-w.Cancel:
+					ticker.Stop()
+					return
 				}
 			}
 		}()
 
+	loop:
 		for _, file := range filesToHash {
 			l.Debugln("real to hash:", file.Name)
-			realToHashChan <- file
+			select {
+			case realToHashChan <- file:
+			case <-w.Cancel:
+				break loop
+			}
 		}
 		close(realToHashChan)
 	}()
@@ -329,7 +339,11 @@ func (w *Walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) filepath.
 
 			l.Debugln("symlink changedb:", p, f)
 
-			dchan <- f
+			select {
+			case dchan <- f:
+			case <-w.Cancel:
+				return errors.New("cancelled")
+			}
 
 			return skip
 		}
@@ -363,7 +377,13 @@ func (w *Walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) filepath.
 				Modified: mtime.Unix(),
 			}
 			l.Debugln("dir:", p, f)
-			dchan <- f
+
+			select {
+			case dchan <- f:
+			case <-w.Cancel:
+				return errors.New("cancelled")
+			}
+
 			return nil
 		}
 
@@ -406,7 +426,12 @@ func (w *Walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) filepath.
 				CachedSize: info.Size(),
 			}
 			l.Debugln("to hash:", p, f)
-			fchan <- f
+
+			select {
+			case fchan <- f:
+			case <-w.Cancel:
+				return errors.New("cancelled")
+			}
 		}
 
 		return nil