|
|
@@ -54,6 +54,19 @@ var (
|
|
|
errNoDevice = errors.New("no available source device")
|
|
|
)
|
|
|
|
|
|
+const (
|
|
|
+ dbUpdateHandleDir = iota
|
|
|
+ dbUpdateDeleteDir
|
|
|
+ dbUpdateHandleFile
|
|
|
+ dbUpdateDeleteFile
|
|
|
+ dbUpdateShortcutFile
|
|
|
+)
|
|
|
+
|
|
|
+type dbUpdateJob struct {
|
|
|
+ file protocol.FileInfo
|
|
|
+ jobType int
|
|
|
+}
|
|
|
+
|
|
|
type rwFolder struct {
|
|
|
stateTracker
|
|
|
|
|
|
@@ -73,7 +86,7 @@ type rwFolder struct {
|
|
|
|
|
|
stop chan struct{}
|
|
|
queue *jobQueue
|
|
|
- dbUpdates chan protocol.FileInfo
|
|
|
+ dbUpdates chan dbUpdateJob
|
|
|
scanTimer *time.Timer
|
|
|
pullTimer *time.Timer
|
|
|
delayScan chan time.Duration
|
|
|
@@ -326,7 +339,7 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
|
|
|
l.Debugln(p, "c", p.copiers, "p", p.pullers)
|
|
|
}
|
|
|
|
|
|
- p.dbUpdates = make(chan protocol.FileInfo)
|
|
|
+ p.dbUpdates = make(chan dbUpdateJob)
|
|
|
updateWg.Add(1)
|
|
|
go func() {
|
|
|
// dbUpdaterRoutine finishes when p.dbUpdates is closed
|
|
|
@@ -583,7 +596,7 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) {
|
|
|
}
|
|
|
|
|
|
if err = osutil.InWritableDir(mkdir, realName); err == nil {
|
|
|
- p.dbUpdates <- file
|
|
|
+ p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
|
|
|
} else {
|
|
|
l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
|
|
|
}
|
|
|
@@ -600,9 +613,9 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) {
|
|
|
// It's OK to change mode bits on stuff within non-writable directories.
|
|
|
|
|
|
if p.ignorePermissions(file) {
|
|
|
- p.dbUpdates <- file
|
|
|
+ p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
|
|
|
} else if err := os.Chmod(realName, mode); err == nil {
|
|
|
- p.dbUpdates <- file
|
|
|
+ p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
|
|
|
} else {
|
|
|
l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
|
|
|
}
|
|
|
@@ -642,13 +655,13 @@ func (p *rwFolder) deleteDir(file protocol.FileInfo) {
|
|
|
err = osutil.InWritableDir(osutil.Remove, realName)
|
|
|
if err == nil || os.IsNotExist(err) {
|
|
|
// It was removed or it doesn't exist to start with
|
|
|
- p.dbUpdates <- file
|
|
|
+ p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
|
|
|
} else if _, serr := os.Lstat(realName); serr != nil && !os.IsPermission(serr) {
|
|
|
// We get an error just looking at the directory, and it's not a
|
|
|
// permission problem. Lets assume the error is in fact some variant
|
|
|
// of "file does not exist" (possibly expressed as some parent being a
|
|
|
// file and not a directory etc) and that the delete is handled.
|
|
|
- p.dbUpdates <- file
|
|
|
+ p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
|
|
|
} else {
|
|
|
l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err)
|
|
|
}
|
|
|
@@ -690,13 +703,13 @@ func (p *rwFolder) deleteFile(file protocol.FileInfo) {
|
|
|
|
|
|
if err == nil || os.IsNotExist(err) {
|
|
|
// It was removed or it doesn't exist to start with
|
|
|
- p.dbUpdates <- file
|
|
|
+ p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
|
|
|
} else if _, serr := os.Lstat(realName); serr != nil && !os.IsPermission(serr) {
|
|
|
// We get an error just looking at the file, and it's not a permission
|
|
|
// problem. Lets assume the error is in fact some variant of "file
|
|
|
// does not exist" (possibly expressed as some parent being a file and
|
|
|
// not a directory etc) and that the delete is handled.
|
|
|
- p.dbUpdates <- file
|
|
|
+ p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
|
|
|
} else {
|
|
|
l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err)
|
|
|
}
|
|
|
@@ -756,13 +769,15 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
|
|
|
// of the source and the creation of the target. Fix-up the metadata,
|
|
|
// and update the local index of the target file.
|
|
|
|
|
|
- p.dbUpdates <- source
|
|
|
+ p.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
|
|
|
|
|
|
err = p.shortcutFile(target)
|
|
|
if err != nil {
|
|
|
l.Infof("Puller (folder %q, file %q): rename from %q metadata: %v", p.folder, target.Name, source.Name, err)
|
|
|
return
|
|
|
}
|
|
|
+
|
|
|
+ p.dbUpdates <- dbUpdateJob{target, dbUpdateHandleFile}
|
|
|
} else {
|
|
|
// We failed the rename so we have a source file that we still need to
|
|
|
// get rid of. Attempt to delete it instead so that we make *some*
|
|
|
@@ -774,7 +789,7 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- p.dbUpdates <- source
|
|
|
+ p.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -848,6 +863,11 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
|
|
|
"type": "file",
|
|
|
"action": "metadata",
|
|
|
})
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ p.dbUpdates <- dbUpdateJob{file, dbUpdateShortcutFile}
|
|
|
+ }
|
|
|
+
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -954,16 +974,13 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) error {
|
|
|
file.Version = file.Version.Merge(cur.Version)
|
|
|
}
|
|
|
|
|
|
- p.dbUpdates <- file
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// shortcutSymlink changes the symlinks type if necessary.
|
|
|
func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) {
|
|
|
err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), file.Flags)
|
|
|
- if err == nil {
|
|
|
- p.dbUpdates <- file
|
|
|
- } else {
|
|
|
+ if err != nil {
|
|
|
l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err)
|
|
|
}
|
|
|
return
|
|
|
@@ -1183,7 +1200,7 @@ func (p *rwFolder) performFinish(state *sharedPullerState) error {
|
|
|
}
|
|
|
|
|
|
// Record the updated file in the index
|
|
|
- p.dbUpdates <- state.file
|
|
|
+ p.dbUpdates <- dbUpdateJob{state.file, dbUpdateHandleFile}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -1239,39 +1256,63 @@ func (p *rwFolder) dbUpdaterRoutine() {
|
|
|
maxBatchTime = 2 * time.Second
|
|
|
)
|
|
|
|
|
|
- batch := make([]protocol.FileInfo, 0, maxBatchSize)
|
|
|
+ batch := make([]dbUpdateJob, 0, maxBatchSize)
|
|
|
+ files := make([]protocol.FileInfo, 0, maxBatchSize)
|
|
|
tick := time.NewTicker(maxBatchTime)
|
|
|
defer tick.Stop()
|
|
|
|
|
|
+ handleBatch := func() {
|
|
|
+ found := false
|
|
|
+ var lastFile protocol.FileInfo
|
|
|
+
|
|
|
+ for _, job := range batch {
|
|
|
+ files = append(files, job.file)
|
|
|
+ if job.file.IsInvalid() || (job.file.IsDirectory() && !job.file.IsSymlink()) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ if job.jobType&(dbUpdateHandleFile|dbUpdateDeleteFile) == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ found = true
|
|
|
+ lastFile = job.file
|
|
|
+ }
|
|
|
+
|
|
|
+ p.model.updateLocals(p.folder, files)
|
|
|
+
|
|
|
+ if found {
|
|
|
+ p.model.receivedFile(p.folder, lastFile)
|
|
|
+ }
|
|
|
+
|
|
|
+ batch = batch[:0]
|
|
|
+ files = files[:0]
|
|
|
+ }
|
|
|
+
|
|
|
loop:
|
|
|
for {
|
|
|
select {
|
|
|
- case file, ok := <-p.dbUpdates:
|
|
|
+ case job, ok := <-p.dbUpdates:
|
|
|
if !ok {
|
|
|
break loop
|
|
|
}
|
|
|
|
|
|
- file.LocalVersion = 0
|
|
|
- batch = append(batch, file)
|
|
|
+ job.file.LocalVersion = 0
|
|
|
+ batch = append(batch, job)
|
|
|
|
|
|
if len(batch) == maxBatchSize {
|
|
|
- p.model.updateLocals(p.folder, batch)
|
|
|
- p.model.receivedFile(p.folder, batch[len(batch)-1].Name)
|
|
|
- batch = batch[:0]
|
|
|
+ handleBatch()
|
|
|
}
|
|
|
|
|
|
case <-tick.C:
|
|
|
if len(batch) > 0 {
|
|
|
- p.model.updateLocals(p.folder, batch)
|
|
|
- p.model.receivedFile(p.folder, batch[len(batch)-1].Name)
|
|
|
- batch = batch[:0]
|
|
|
+ handleBatch()
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if len(batch) > 0 {
|
|
|
- p.model.updateLocals(p.folder, batch)
|
|
|
- p.model.receivedFile(p.folder, batch[len(batch)-1].Name)
|
|
|
+ handleBatch()
|
|
|
}
|
|
|
}
|
|
|
|