|
@@ -149,10 +149,12 @@ func (f *sendReceiveFolder) pull() bool {
|
|
|
|
|
|
// If there is nothing to do, don't even enter pulling state.
|
|
|
abort := true
|
|
|
- f.fset.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
|
|
|
+ snap := f.fset.Snapshot()
|
|
|
+ snap.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
|
|
|
abort = false
|
|
|
return false
|
|
|
})
|
|
|
+ snap.Release()
|
|
|
if abort {
|
|
|
return true
|
|
|
}
|
|
@@ -234,6 +236,9 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int {
|
|
|
f.pullErrors = make(map[string]string)
|
|
|
f.pullErrorsMut.Unlock()
|
|
|
|
|
|
+ snap := f.fset.Snapshot()
|
|
|
+ defer snap.Release()
|
|
|
+
|
|
|
pullChan := make(chan pullBlockState)
|
|
|
copyChan := make(chan copyBlocksState)
|
|
|
finisherChan := make(chan *sharedPullerState)
|
|
@@ -272,11 +277,11 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int {
|
|
|
doneWg.Add(1)
|
|
|
// finisherRoutine finishes when finisherChan is closed
|
|
|
go func() {
|
|
|
- f.finisherRoutine(finisherChan, dbUpdateChan, scanChan)
|
|
|
+ f.finisherRoutine(snap, finisherChan, dbUpdateChan, scanChan)
|
|
|
doneWg.Done()
|
|
|
}()
|
|
|
|
|
|
- changed, fileDeletions, dirDeletions, err := f.processNeeded(dbUpdateChan, copyChan, scanChan)
|
|
|
+ changed, fileDeletions, dirDeletions, err := f.processNeeded(snap, dbUpdateChan, copyChan, scanChan)
|
|
|
|
|
|
// Signal copy and puller routines that we are done with the in data for
|
|
|
// this iteration. Wait for them to finish.
|
|
@@ -291,7 +296,7 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int {
|
|
|
doneWg.Wait()
|
|
|
|
|
|
if err == nil {
|
|
|
- f.processDeletions(fileDeletions, dirDeletions, dbUpdateChan, scanChan)
|
|
|
+ f.processDeletions(fileDeletions, dirDeletions, snap, dbUpdateChan, scanChan)
|
|
|
}
|
|
|
|
|
|
// Wait for db updates and scan scheduling to complete
|
|
@@ -307,7 +312,7 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int {
|
|
|
return changed
|
|
|
}
|
|
|
|
|
|
-func (f *sendReceiveFolder) processNeeded(dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) {
|
|
|
+func (f *sendReceiveFolder) processNeeded(snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) {
|
|
|
changed := 0
|
|
|
var dirDeletions []protocol.FileInfo
|
|
|
fileDeletions := map[string]protocol.FileInfo{}
|
|
@@ -317,7 +322,7 @@ func (f *sendReceiveFolder) processNeeded(dbUpdateChan chan<- dbUpdateJob, copyC
|
|
|
// Regular files to pull goes into the file queue, everything else
|
|
|
// (directories, symlinks and deletes) goes into the "process directly"
|
|
|
// pile.
|
|
|
- f.fset.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
|
|
|
+ snap.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
|
|
|
select {
|
|
|
case <-f.ctx.Done():
|
|
|
return false
|
|
@@ -359,9 +364,9 @@ func (f *sendReceiveFolder) processNeeded(dbUpdateChan chan<- dbUpdateJob, copyC
|
|
|
// files to delete inside them before we get to that point.
|
|
|
dirDeletions = append(dirDeletions, file)
|
|
|
} else if file.IsSymlink() {
|
|
|
- f.deleteFile(file, dbUpdateChan, scanChan)
|
|
|
+ f.deleteFile(file, snap, dbUpdateChan, scanChan)
|
|
|
} else {
|
|
|
- df, ok := f.fset.Get(protocol.LocalDeviceID, file.Name)
|
|
|
+ df, ok := snap.Get(protocol.LocalDeviceID, file.Name)
|
|
|
// Local file can be already deleted, but with a lower version
|
|
|
// number, hence the deletion coming in again as part of
|
|
|
// WithNeed, furthermore, the file can simply be of the wrong
|
|
@@ -377,7 +382,7 @@ func (f *sendReceiveFolder) processNeeded(dbUpdateChan chan<- dbUpdateJob, copyC
|
|
|
}
|
|
|
|
|
|
case file.Type == protocol.FileInfoTypeFile:
|
|
|
- curFile, hasCurFile := f.fset.Get(protocol.LocalDeviceID, file.Name)
|
|
|
+ curFile, hasCurFile := snap.Get(protocol.LocalDeviceID, file.Name)
|
|
|
if _, need := blockDiff(curFile.Blocks, file.Blocks); hasCurFile && len(need) == 0 {
|
|
|
// We are supposed to copy the entire file, and then fetch nothing. We
|
|
|
// are only updating metadata, so we don't actually *need* to make the
|
|
@@ -396,13 +401,13 @@ func (f *sendReceiveFolder) processNeeded(dbUpdateChan chan<- dbUpdateJob, copyC
|
|
|
case file.IsDirectory() && !file.IsSymlink():
|
|
|
l.Debugln(f, "Handling directory", file.Name)
|
|
|
if f.checkParent(file.Name, scanChan) {
|
|
|
- f.handleDir(file, dbUpdateChan, scanChan)
|
|
|
+ f.handleDir(file, snap, dbUpdateChan, scanChan)
|
|
|
}
|
|
|
|
|
|
case file.IsSymlink():
|
|
|
l.Debugln(f, "Handling symlink", file.Name)
|
|
|
if f.checkParent(file.Name, scanChan) {
|
|
|
- f.handleSymlink(file, dbUpdateChan, scanChan)
|
|
|
+ f.handleSymlink(file, snap, dbUpdateChan, scanChan)
|
|
|
}
|
|
|
|
|
|
default:
|
|
@@ -451,7 +456,7 @@ nextFile:
|
|
|
break
|
|
|
}
|
|
|
|
|
|
- fi, ok := f.fset.GetGlobal(fileName)
|
|
|
+ fi, ok := snap.GetGlobal(fileName)
|
|
|
if !ok {
|
|
|
// File is no longer in the index. Mark it as done and drop it.
|
|
|
f.queue.Done(fileName)
|
|
@@ -484,7 +489,7 @@ nextFile:
|
|
|
// desired state with the delete bit set is in the deletion
|
|
|
// map.
|
|
|
desired := fileDeletions[candidate.Name]
|
|
|
- if err := f.renameFile(candidate, desired, fi, dbUpdateChan, scanChan); err != nil {
|
|
|
+ if err := f.renameFile(candidate, desired, fi, snap, dbUpdateChan, scanChan); err != nil {
|
|
|
// Failed to rename, try to handle files as separate
|
|
|
// deletions and updates.
|
|
|
break
|
|
@@ -498,11 +503,11 @@ nextFile:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- devices := f.fset.Availability(fileName)
|
|
|
+ devices := snap.Availability(fileName)
|
|
|
for _, dev := range devices {
|
|
|
if _, ok := f.model.Connection(dev); ok {
|
|
|
// Handle the file normally, by coping and pulling, etc.
|
|
|
- f.handleFile(fi, copyChan, dbUpdateChan)
|
|
|
+ f.handleFile(fi, snap, copyChan)
|
|
|
continue nextFile
|
|
|
}
|
|
|
}
|
|
@@ -513,7 +518,7 @@ nextFile:
|
|
|
return changed, fileDeletions, dirDeletions, nil
|
|
|
}
|
|
|
|
|
|
-func (f *sendReceiveFolder) processDeletions(fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
+func (f *sendReceiveFolder) processDeletions(fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
for _, file := range fileDeletions {
|
|
|
select {
|
|
|
case <-f.ctx.Done():
|
|
@@ -521,7 +526,7 @@ func (f *sendReceiveFolder) processDeletions(fileDeletions map[string]protocol.F
|
|
|
default:
|
|
|
}
|
|
|
|
|
|
- f.deleteFile(file, dbUpdateChan, scanChan)
|
|
|
+ f.deleteFile(file, snap, dbUpdateChan, scanChan)
|
|
|
}
|
|
|
|
|
|
// Process in reverse order to delete depth first
|
|
@@ -534,12 +539,12 @@ func (f *sendReceiveFolder) processDeletions(fileDeletions map[string]protocol.F
|
|
|
|
|
|
dir := dirDeletions[len(dirDeletions)-i-1]
|
|
|
l.Debugln(f, "Deleting dir", dir.Name)
|
|
|
- f.deleteDir(dir, dbUpdateChan, scanChan)
|
|
|
+ f.deleteDir(dir, snap, dbUpdateChan, scanChan)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// handleDir creates or updates the given directory
|
|
|
-func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
+func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
// Used in the defer closure below, updated by the function body. Take
|
|
|
// care not declare another err.
|
|
|
var err error
|
|
@@ -567,7 +572,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<
|
|
|
}
|
|
|
|
|
|
if shouldDebug() {
|
|
|
- curFile, _ := f.fset.Get(protocol.LocalDeviceID, file.Name)
|
|
|
+ curFile, _ := snap.Get(protocol.LocalDeviceID, file.Name)
|
|
|
l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
|
|
|
}
|
|
|
|
|
@@ -598,7 +603,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<
|
|
|
return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
|
|
|
}, curFile.Name)
|
|
|
} else {
|
|
|
- err = f.deleteItemOnDisk(curFile, scanChan)
|
|
|
+ err = f.deleteItemOnDisk(curFile, snap, scanChan)
|
|
|
}
|
|
|
if err != nil {
|
|
|
f.newPullError(file.Name, err)
|
|
@@ -693,7 +698,7 @@ func (f *sendReceiveFolder) checkParent(file string, scanChan chan<- string) boo
|
|
|
}
|
|
|
|
|
|
// handleSymlink creates or updates the given symlink
|
|
|
-func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
+func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
// Used in the defer closure below, updated by the function body. Take
|
|
|
// care not declare another err.
|
|
|
var err error
|
|
@@ -716,7 +721,7 @@ func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, dbUpdateChan c
|
|
|
}()
|
|
|
|
|
|
if shouldDebug() {
|
|
|
- curFile, _ := f.fset.Get(protocol.LocalDeviceID, file.Name)
|
|
|
+ curFile, _ := snap.Get(protocol.LocalDeviceID, file.Name)
|
|
|
l.Debugf("need symlink\n\t%v\n\t%v", file, curFile)
|
|
|
}
|
|
|
|
|
@@ -750,7 +755,7 @@ func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, dbUpdateChan c
|
|
|
return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
|
|
|
}, curFile.Name)
|
|
|
} else {
|
|
|
- err = f.deleteItemOnDisk(curFile, scanChan)
|
|
|
+ err = f.deleteItemOnDisk(curFile, snap, scanChan)
|
|
|
}
|
|
|
if err != nil {
|
|
|
f.newPullError(file.Name, errors.Wrap(err, "symlink remove"))
|
|
@@ -775,7 +780,7 @@ func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, dbUpdateChan c
|
|
|
}
|
|
|
|
|
|
// deleteDir attempts to remove a directory that was deleted on a remote
|
|
|
-func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
+func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
// Used in the defer closure below, updated by the function body. Take
|
|
|
// care not declare another err.
|
|
|
var err error
|
|
@@ -797,7 +802,7 @@ func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, dbUpdateChan chan<
|
|
|
})
|
|
|
}()
|
|
|
|
|
|
- if err = f.deleteDirOnDisk(file.Name, scanChan); err != nil {
|
|
|
+ if err = f.deleteDirOnDisk(file.Name, snap, scanChan); err != nil {
|
|
|
f.newPullError(file.Name, errors.Wrap(err, "delete dir"))
|
|
|
return
|
|
|
}
|
|
@@ -806,8 +811,8 @@ func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, dbUpdateChan chan<
|
|
|
}
|
|
|
|
|
|
// deleteFile attempts to delete the given file
|
|
|
-func (f *sendReceiveFolder) deleteFile(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
- cur, hasCur := f.fset.Get(protocol.LocalDeviceID, file.Name)
|
|
|
+func (f *sendReceiveFolder) deleteFile(file protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
+ cur, hasCur := snap.Get(protocol.LocalDeviceID, file.Name)
|
|
|
f.deleteFileWithCurrent(file, cur, hasCur, dbUpdateChan, scanChan)
|
|
|
}
|
|
|
|
|
@@ -895,7 +900,7 @@ func (f *sendReceiveFolder) deleteFileWithCurrent(file, cur protocol.FileInfo, h
|
|
|
|
|
|
// renameFile attempts to rename an existing file to a destination
|
|
|
// and set the right attributes on it.
|
|
|
-func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
|
|
|
+func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
|
|
|
// Used in the defer closure below, updated by the function body. Take
|
|
|
// care not declare another err.
|
|
|
var err error
|
|
@@ -937,7 +942,7 @@ func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, db
|
|
|
return err
|
|
|
}
|
|
|
// Check that the target corresponds to what we have in the DB
|
|
|
- curTarget, ok := f.fset.Get(protocol.LocalDeviceID, target.Name)
|
|
|
+ curTarget, ok := snap.Get(protocol.LocalDeviceID, target.Name)
|
|
|
switch stat, serr := f.fs.Lstat(target.Name); {
|
|
|
case serr != nil && fs.IsNotExist(serr):
|
|
|
if !ok || curTarget.IsDeleted() {
|
|
@@ -994,7 +999,7 @@ func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, db
|
|
|
// of the source and the creation of the target temp file. Fix-up the metadata,
|
|
|
// update the local index of the target file and rename from temp to real name.
|
|
|
|
|
|
- if err = f.performFinish(target, curTarget, true, tempName, dbUpdateChan, scanChan); err != nil {
|
|
|
+ if err = f.performFinish(target, curTarget, true, tempName, snap, dbUpdateChan, scanChan); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
@@ -1039,8 +1044,8 @@ func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, db
|
|
|
|
|
|
// handleFile queues the copies and pulls as necessary for a single new or
|
|
|
// changed file.
|
|
|
-func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, dbUpdateChan chan<- dbUpdateJob) {
|
|
|
- curFile, hasCurFile := f.fset.Get(protocol.LocalDeviceID, file.Name)
|
|
|
+func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, snap *db.Snapshot, copyChan chan<- copyBlocksState) {
|
|
|
+ curFile, hasCurFile := snap.Get(protocol.LocalDeviceID, file.Name)
|
|
|
|
|
|
have, _ := blockDiff(curFile.Blocks, file.Blocks)
|
|
|
|
|
@@ -1493,7 +1498,7 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu
|
|
|
out <- state.sharedPullerState
|
|
|
}
|
|
|
|
|
|
-func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCurFile bool, tempName string, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
|
|
|
+func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCurFile bool, tempName string, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
|
|
|
// Set the correct permission bits on the new file
|
|
|
if !f.IgnorePerms && !file.NoPermissions {
|
|
|
if err := f.fs.Chmod(tempName, fs.FileMode(file.Permissions&0777)); err != nil {
|
|
@@ -1528,7 +1533,7 @@ func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCu
|
|
|
return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
|
|
|
}, curFile.Name)
|
|
|
} else {
|
|
|
- err = f.deleteItemOnDisk(curFile, scanChan)
|
|
|
+ err = f.deleteItemOnDisk(curFile, snap, scanChan)
|
|
|
}
|
|
|
if err != nil {
|
|
|
return err
|
|
@@ -1549,7 +1554,7 @@ func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCu
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
+func (f *sendReceiveFolder) finisherRoutine(snap *db.Snapshot, in <-chan *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
for state := range in {
|
|
|
if closed, err := state.finalClose(); closed {
|
|
|
l.Debugln(f, "closing", state.file.Name)
|
|
@@ -1557,7 +1562,7 @@ func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState, dbUpda
|
|
|
f.queue.Done(state.file.Name)
|
|
|
|
|
|
if err == nil {
|
|
|
- err = f.performFinish(state.file, state.curFile, state.hasCurFile, state.tempName, dbUpdateChan, scanChan)
|
|
|
+ err = f.performFinish(state.file, state.curFile, state.hasCurFile, state.tempName, snap, dbUpdateChan, scanChan)
|
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
@@ -1809,7 +1814,7 @@ func (f *sendReceiveFolder) Errors() []FileError {
|
|
|
}
|
|
|
|
|
|
// deleteItemOnDisk deletes the file represented by old that is about to be replaced by new.
|
|
|
-func (f *sendReceiveFolder) deleteItemOnDisk(item protocol.FileInfo, scanChan chan<- string) (err error) {
|
|
|
+func (f *sendReceiveFolder) deleteItemOnDisk(item protocol.FileInfo, snap *db.Snapshot, scanChan chan<- string) (err error) {
|
|
|
defer func() {
|
|
|
err = errors.Wrap(err, contextRemovingOldItem)
|
|
|
}()
|
|
@@ -1818,7 +1823,7 @@ func (f *sendReceiveFolder) deleteItemOnDisk(item protocol.FileInfo, scanChan ch
|
|
|
case item.IsDirectory():
|
|
|
// Directories aren't archived and need special treatment due
|
|
|
// to potential children.
|
|
|
- return f.deleteDirOnDisk(item.Name, scanChan)
|
|
|
+ return f.deleteDirOnDisk(item.Name, snap, scanChan)
|
|
|
|
|
|
case !item.IsSymlink() && f.versioner != nil:
|
|
|
// If we should use versioning, let the versioner archive the
|
|
@@ -1834,7 +1839,7 @@ func (f *sendReceiveFolder) deleteItemOnDisk(item protocol.FileInfo, scanChan ch
|
|
|
|
|
|
// deleteDirOnDisk attempts to delete a directory. It checks for files/dirs inside
|
|
|
// the directory and removes them if possible or returns an error if it fails
|
|
|
-func (f *sendReceiveFolder) deleteDirOnDisk(dir string, scanChan chan<- string) error {
|
|
|
+func (f *sendReceiveFolder) deleteDirOnDisk(dir string, snap *db.Snapshot, scanChan chan<- string) error {
|
|
|
if err := osutil.TraversesSymlink(f.fs, filepath.Dir(dir)); err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -1853,7 +1858,7 @@ func (f *sendReceiveFolder) deleteDirOnDisk(dir string, scanChan chan<- string)
|
|
|
toBeDeleted = append(toBeDeleted, fullDirFile)
|
|
|
} else if f.ignores != nil && f.ignores.Match(fullDirFile).IsIgnored() {
|
|
|
hasIgnored = true
|
|
|
- } else if cf, ok := f.fset.Get(protocol.LocalDeviceID, fullDirFile); !ok || cf.IsDeleted() || cf.IsInvalid() {
|
|
|
+ } else if cf, ok := snap.Get(protocol.LocalDeviceID, fullDirFile); !ok || cf.IsDeleted() || cf.IsInvalid() {
|
|
|
// Something appeared in the dir that we either are not aware of
|
|
|
// at all, that we think should be deleted or that is invalid,
|
|
|
// but not currently ignored -> schedule scan. The scanChan
|