|
@@ -161,20 +161,20 @@ func newSendReceiveFolder(model *model, ignores *ignore.Matcher, cfg config.Fold
|
|
|
|
|
|
|
|
// pull returns true if it manages to get all needed items from peers, i.e. get
|
|
// pull returns true if it manages to get all needed items from peers, i.e. get
|
|
|
// the device in sync with the global state.
|
|
// the device in sync with the global state.
|
|
|
-func (f *sendReceiveFolder) pull() (bool, error) {
|
|
|
|
|
- l.Debugf("%v pulling", f)
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) pull(ctx context.Context) (bool, error) {
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Pulling")
|
|
|
|
|
|
|
|
scanChan := make(chan string)
|
|
scanChan := make(chan string)
|
|
|
- go f.pullScannerRoutine(scanChan)
|
|
|
|
|
|
|
+ go f.pullScannerRoutine(ctx, scanChan)
|
|
|
defer func() {
|
|
defer func() {
|
|
|
close(scanChan)
|
|
close(scanChan)
|
|
|
f.setState(FolderIdle)
|
|
f.setState(FolderIdle)
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
metricFolderPulls.WithLabelValues(f.ID).Inc()
|
|
metricFolderPulls.WithLabelValues(f.ID).Inc()
|
|
|
- ctx, cancel := context.WithCancel(f.ctx)
|
|
|
|
|
|
|
+ pullCtx, cancel := context.WithCancel(ctx)
|
|
|
defer cancel()
|
|
defer cancel()
|
|
|
- go addTimeUntilCancelled(ctx, metricFolderPullSeconds.WithLabelValues(f.ID))
|
|
|
|
|
|
|
+ go addTimeUntilCancelled(pullCtx, metricFolderPullSeconds.WithLabelValues(f.ID))
|
|
|
|
|
|
|
|
changed := 0
|
|
changed := 0
|
|
|
|
|
|
|
@@ -185,8 +185,8 @@ func (f *sendReceiveFolder) pull() (bool, error) {
|
|
|
var err error
|
|
var err error
|
|
|
for tries := range maxPullerIterations {
|
|
for tries := range maxPullerIterations {
|
|
|
select {
|
|
select {
|
|
|
- case <-f.ctx.Done():
|
|
|
|
|
- return false, f.ctx.Err()
|
|
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ return false, ctx.Err()
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -194,12 +194,12 @@ func (f *sendReceiveFolder) pull() (bool, error) {
|
|
|
// it to FolderSyncing during the last iteration.
|
|
// it to FolderSyncing during the last iteration.
|
|
|
f.setState(FolderSyncPreparing)
|
|
f.setState(FolderSyncPreparing)
|
|
|
|
|
|
|
|
- changed, err = f.pullerIteration(scanChan)
|
|
|
|
|
|
|
+ changed, err = f.pullerIteration(ctx, scanChan)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return false, err
|
|
return false, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- l.Debugln(f, "changed", changed, "on try", tries+1)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Pull iteration completed", "changed", changed, "try", tries+1)
|
|
|
|
|
|
|
|
if changed == 0 {
|
|
if changed == 0 {
|
|
|
// No files were changed by the puller, so we are in
|
|
// No files were changed by the puller, so we are in
|
|
@@ -214,7 +214,7 @@ func (f *sendReceiveFolder) pull() (bool, error) {
|
|
|
if pullErrNum > 0 {
|
|
if pullErrNum > 0 {
|
|
|
f.pullErrors = make([]FileError, 0, len(f.tempPullErrors))
|
|
f.pullErrors = make([]FileError, 0, len(f.tempPullErrors))
|
|
|
for path, err := range f.tempPullErrors {
|
|
for path, err := range f.tempPullErrors {
|
|
|
- f.sl.Warn("Failed to sync", slogutil.FilePath(path), slogutil.Error(err))
|
|
|
|
|
|
|
+ f.sl.WarnContext(ctx, "Failed to sync", slogutil.FilePath(path), slogutil.Error(err))
|
|
|
f.pullErrors = append(f.pullErrors, FileError{
|
|
f.pullErrors = append(f.pullErrors, FileError{
|
|
|
Err: err,
|
|
Err: err,
|
|
|
Path: path,
|
|
Path: path,
|
|
@@ -238,7 +238,7 @@ func (f *sendReceiveFolder) pull() (bool, error) {
|
|
|
// returns the number items that should have been synced (even those that
|
|
// returns the number items that should have been synced (even those that
|
|
|
// might have failed). One puller iteration handles all files currently
|
|
// might have failed). One puller iteration handles all files currently
|
|
|
// flagged as needed in the folder.
|
|
// flagged as needed in the folder.
|
|
|
-func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error) {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) pullerIteration(ctx context.Context, scanChan chan<- string) (int, error) {
|
|
|
f.errorsMut.Lock()
|
|
f.errorsMut.Lock()
|
|
|
f.tempPullErrors = make(map[string]string)
|
|
f.tempPullErrors = make(map[string]string)
|
|
|
f.errorsMut.Unlock()
|
|
f.errorsMut.Unlock()
|
|
@@ -253,7 +253,7 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error)
|
|
|
var doneWg sync.WaitGroup
|
|
var doneWg sync.WaitGroup
|
|
|
var updateWg sync.WaitGroup
|
|
var updateWg sync.WaitGroup
|
|
|
|
|
|
|
|
- l.Debugln(f, "copiers:", f.Copiers, "pullerPendingKiB:", f.PullerMaxPendingKiB)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Starting puller iteration", "copiers", f.Copiers, "pullerPendingKiB", f.PullerMaxPendingKiB)
|
|
|
|
|
|
|
|
updateWg.Add(1)
|
|
updateWg.Add(1)
|
|
|
go func() {
|
|
go func() {
|
|
@@ -266,7 +266,7 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error)
|
|
|
copyWg.Add(1)
|
|
copyWg.Add(1)
|
|
|
go func() {
|
|
go func() {
|
|
|
// copierRoutine finishes when copyChan is closed
|
|
// copierRoutine finishes when copyChan is closed
|
|
|
- f.copierRoutine(copyChan, pullChan, finisherChan)
|
|
|
|
|
|
|
+ f.copierRoutine(ctx, copyChan, pullChan, finisherChan)
|
|
|
copyWg.Done()
|
|
copyWg.Done()
|
|
|
}()
|
|
}()
|
|
|
}
|
|
}
|
|
@@ -274,18 +274,18 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error)
|
|
|
pullWg.Add(1)
|
|
pullWg.Add(1)
|
|
|
go func() {
|
|
go func() {
|
|
|
// pullerRoutine finishes when pullChan is closed
|
|
// pullerRoutine finishes when pullChan is closed
|
|
|
- f.pullerRoutine(pullChan, finisherChan)
|
|
|
|
|
|
|
+ f.pullerRoutine(ctx, pullChan, finisherChan)
|
|
|
pullWg.Done()
|
|
pullWg.Done()
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
doneWg.Add(1)
|
|
doneWg.Add(1)
|
|
|
// finisherRoutine finishes when finisherChan is closed
|
|
// finisherRoutine finishes when finisherChan is closed
|
|
|
go func() {
|
|
go func() {
|
|
|
- f.finisherRoutine(finisherChan, dbUpdateChan, scanChan)
|
|
|
|
|
|
|
+ f.finisherRoutine(ctx, finisherChan, dbUpdateChan, scanChan)
|
|
|
doneWg.Done()
|
|
doneWg.Done()
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
- changed, fileDeletions, dirDeletions, err := f.processNeeded(dbUpdateChan, copyChan, scanChan)
|
|
|
|
|
|
|
+ changed, fileDeletions, dirDeletions, err := f.processNeeded(ctx, dbUpdateChan, copyChan, scanChan)
|
|
|
|
|
|
|
|
// Signal copy and puller routines that we are done with the in data for
|
|
// Signal copy and puller routines that we are done with the in data for
|
|
|
// this iteration. Wait for them to finish.
|
|
// this iteration. Wait for them to finish.
|
|
@@ -300,7 +300,7 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error)
|
|
|
doneWg.Wait()
|
|
doneWg.Wait()
|
|
|
|
|
|
|
|
if err == nil {
|
|
if err == nil {
|
|
|
- f.processDeletions(fileDeletions, dirDeletions, dbUpdateChan, scanChan)
|
|
|
|
|
|
|
+ f.processDeletions(ctx, fileDeletions, dirDeletions, dbUpdateChan, scanChan)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Wait for db updates and scan scheduling to complete
|
|
// Wait for db updates and scan scheduling to complete
|
|
@@ -312,7 +312,7 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error)
|
|
|
return changed, err
|
|
return changed, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (f *sendReceiveFolder) processNeeded(dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) processNeeded(ctx context.Context, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) {
|
|
|
changed := 0
|
|
changed := 0
|
|
|
var dirDeletions []protocol.FileInfo
|
|
var dirDeletions []protocol.FileInfo
|
|
|
fileDeletions := map[string]protocol.FileInfo{}
|
|
fileDeletions := map[string]protocol.FileInfo{}
|
|
@@ -328,13 +328,13 @@ loop:
|
|
|
return changed, nil, nil, err
|
|
return changed, nil, nil, err
|
|
|
}
|
|
}
|
|
|
select {
|
|
select {
|
|
|
- case <-f.ctx.Done():
|
|
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
break loop
|
|
break loop
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if f.IgnoreDelete && file.IsDeleted() {
|
|
if f.IgnoreDelete && file.IsDeleted() {
|
|
|
- l.Debugln(f, "ignore file deletion (config)", file.FileName())
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Ignoring file deletion per config", slogutil.FilePath(file.FileName()))
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -343,7 +343,7 @@ loop:
|
|
|
switch {
|
|
switch {
|
|
|
case f.ignores.Match(file.Name).IsIgnored():
|
|
case f.ignores.Match(file.Name).IsIgnored():
|
|
|
file.SetIgnored()
|
|
file.SetIgnored()
|
|
|
- l.Debugln(f, "Handling ignored file", file)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Handling ignored file", file.LogAttr())
|
|
|
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
|
|
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
|
|
|
|
|
|
|
|
case build.IsWindows && fs.WindowsInvalidFilename(file.Name) != nil:
|
|
case build.IsWindows && fs.WindowsInvalidFilename(file.Name) != nil:
|
|
@@ -409,17 +409,17 @@ loop:
|
|
|
break
|
|
break
|
|
|
}
|
|
}
|
|
|
file.SetUnsupported()
|
|
file.SetUnsupported()
|
|
|
- l.Debugln(f, "Invalidating symlink (unsupported)", file.Name)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Invalidating unsupported symlink", slogutil.FilePath(file.Name))
|
|
|
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
|
|
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
|
|
|
|
|
|
|
|
case file.IsDirectory() && !file.IsSymlink():
|
|
case file.IsDirectory() && !file.IsSymlink():
|
|
|
- l.Debugln(f, "Handling directory", file.Name)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Handling directory", slogutil.FilePath(file.Name))
|
|
|
if f.checkParent(file.Name, scanChan) {
|
|
if f.checkParent(file.Name, scanChan) {
|
|
|
f.handleDir(file, dbUpdateChan, scanChan)
|
|
f.handleDir(file, dbUpdateChan, scanChan)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
case file.IsSymlink():
|
|
case file.IsSymlink():
|
|
|
- l.Debugln(f, "Handling symlink", file.Name)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Handling symlink", slogutil.FilePath(file.Name))
|
|
|
if f.checkParent(file.Name, scanChan) {
|
|
if f.checkParent(file.Name, scanChan) {
|
|
|
f.handleSymlink(file, dbUpdateChan, scanChan)
|
|
f.handleSymlink(file, dbUpdateChan, scanChan)
|
|
|
}
|
|
}
|
|
@@ -430,8 +430,8 @@ loop:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
select {
|
|
|
- case <-f.ctx.Done():
|
|
|
|
|
- return changed, nil, nil, f.ctx.Err()
|
|
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ return changed, nil, nil, ctx.Err()
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -440,8 +440,8 @@ loop:
|
|
|
nextFile:
|
|
nextFile:
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
|
- case <-f.ctx.Done():
|
|
|
|
|
- return changed, fileDeletions, dirDeletions, f.ctx.Err()
|
|
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ return changed, fileDeletions, dirDeletions, ctx.Err()
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -481,7 +481,7 @@ nextFile:
|
|
|
// map.
|
|
// map.
|
|
|
desired := fileDeletions[candidate.Name]
|
|
desired := fileDeletions[candidate.Name]
|
|
|
if err := f.renameFile(candidate, desired, fi, dbUpdateChan, scanChan); err != nil {
|
|
if err := f.renameFile(candidate, desired, fi, dbUpdateChan, scanChan); err != nil {
|
|
|
- l.Debugf("rename shortcut for %s failed: %s", fi.Name, err.Error())
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Rename shortcut failed", slogutil.FilePath(fi.Name), slogutil.Error(err))
|
|
|
// Failed to rename, try next one.
|
|
// Failed to rename, try next one.
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
@@ -510,7 +510,7 @@ nextFile:
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if err := f.handleFile(fi, copyChan); err != nil {
|
|
|
|
|
|
|
+ if err := f.handleFile(ctx, fi, copyChan); err != nil {
|
|
|
f.newPullError(fileName, err)
|
|
f.newPullError(fileName, err)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -528,10 +528,10 @@ func popCandidate(buckets map[string][]protocol.FileInfo, key string) (protocol.
|
|
|
return cands[0], true
|
|
return cands[0], true
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (f *sendReceiveFolder) processDeletions(fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) processDeletions(ctx context.Context, fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
for _, file := range fileDeletions {
|
|
for _, file := range fileDeletions {
|
|
|
select {
|
|
select {
|
|
|
- case <-f.ctx.Done():
|
|
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
return
|
|
return
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
@@ -542,13 +542,13 @@ func (f *sendReceiveFolder) processDeletions(fileDeletions map[string]protocol.F
|
|
|
// Process in reverse order to delete depth first
|
|
// Process in reverse order to delete depth first
|
|
|
for i := range dirDeletions {
|
|
for i := range dirDeletions {
|
|
|
select {
|
|
select {
|
|
|
- case <-f.ctx.Done():
|
|
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
return
|
|
return
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
dir := dirDeletions[len(dirDeletions)-i-1]
|
|
dir := dirDeletions[len(dirDeletions)-i-1]
|
|
|
- l.Debugln(f, "Deleting dir", dir.Name)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Deleting directory", slogutil.FilePath(dir.Name))
|
|
|
f.deleteDir(dir, dbUpdateChan, scanChan)
|
|
f.deleteDir(dir, dbUpdateChan, scanChan)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -708,10 +708,10 @@ func (f *sendReceiveFolder) checkParent(file string, scanChan chan<- string) boo
|
|
|
// Encrypted files have made-up filenames with two synthetic parent
|
|
// Encrypted files have made-up filenames with two synthetic parent
|
|
|
// directories which don't have any meaning. Create those if necessary.
|
|
// directories which don't have any meaning. Create those if necessary.
|
|
|
if _, err := f.mtimefs.Lstat(parent); !fs.IsNotExist(err) {
|
|
if _, err := f.mtimefs.Lstat(parent); !fs.IsNotExist(err) {
|
|
|
- l.Debugf("%v parent not missing %v", f, file)
|
|
|
|
|
|
|
+ f.sl.Debug("Parent directory exists", slogutil.FilePath(file))
|
|
|
return true
|
|
return true
|
|
|
}
|
|
}
|
|
|
- l.Debugf("%v creating parent directory of %v", f, file)
|
|
|
|
|
|
|
+ f.sl.Debug("Creating parent directory", slogutil.FilePath(file))
|
|
|
if err := f.mtimefs.MkdirAll(parent, 0o755); err != nil {
|
|
if err := f.mtimefs.MkdirAll(parent, 0o755); err != nil {
|
|
|
f.newPullError(file, fmt.Errorf("creating parent dir: %w", err))
|
|
f.newPullError(file, fmt.Errorf("creating parent dir: %w", err))
|
|
|
return false
|
|
return false
|
|
@@ -880,7 +880,7 @@ func (f *sendReceiveFolder) deleteFileWithCurrent(file, cur protocol.FileInfo, h
|
|
|
// care not declare another err.
|
|
// care not declare another err.
|
|
|
var err error
|
|
var err error
|
|
|
|
|
|
|
|
- l.Debugln(f, "Deleting file or symlink", file.Name)
|
|
|
|
|
|
|
+ f.sl.Debug("Deleting file or symlink", slogutil.FilePath(file.Name))
|
|
|
|
|
|
|
|
f.evLogger.Log(events.ItemStarted, map[string]string{
|
|
f.evLogger.Log(events.ItemStarted, map[string]string{
|
|
|
"folder": f.folderID,
|
|
"folder": f.folderID,
|
|
@@ -1001,7 +1001,7 @@ func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, db
|
|
|
})
|
|
})
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
- l.Debugln(f, "taking rename shortcut", source.Name, "->", target.Name)
|
|
|
|
|
|
|
+ f.sl.Debug("Taking rename shortcut", "from", source.Name, "to", target.Name)
|
|
|
|
|
|
|
|
// Check that source is compatible with what we have in the DB
|
|
// Check that source is compatible with what we have in the DB
|
|
|
if err = f.checkToBeDeleted(source, cur, true, scanChan); err != nil {
|
|
if err = f.checkToBeDeleted(source, cur, true, scanChan); err != nil {
|
|
@@ -1130,7 +1130,7 @@ func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, db
|
|
|
|
|
|
|
|
// handleFile queues the copies and pulls as necessary for a single new or
|
|
// handleFile queues the copies and pulls as necessary for a single new or
|
|
|
// changed file.
|
|
// changed file.
|
|
|
-func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState) error {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) handleFile(ctx context.Context, file protocol.FileInfo, copyChan chan<- copyBlocksState) error {
|
|
|
curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
|
|
curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
@@ -1146,7 +1146,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c
|
|
|
reused := make([]int, 0, len(file.Blocks))
|
|
reused := make([]int, 0, len(file.Blocks))
|
|
|
|
|
|
|
|
if f.Type != config.FolderTypeReceiveEncrypted {
|
|
if f.Type != config.FolderTypeReceiveEncrypted {
|
|
|
- blocks, reused = f.reuseBlocks(blocks, reused, file, tempName)
|
|
|
|
|
|
|
+ blocks, reused = f.reuseBlocks(ctx, blocks, reused, file, tempName)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// The sharedpullerstate will know which flags to use when opening the
|
|
// The sharedpullerstate will know which flags to use when opening the
|
|
@@ -1170,7 +1170,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c
|
|
|
|
|
|
|
|
s := newSharedPullerState(file, f.mtimefs, f.folderID, tempName, blocks, reused, f.IgnorePerms || file.NoPermissions, hasCurFile, curFile, !f.DisableSparseFiles, !f.DisableFsync)
|
|
s := newSharedPullerState(file, f.mtimefs, f.folderID, tempName, blocks, reused, f.IgnorePerms || file.NoPermissions, hasCurFile, curFile, !f.DisableSparseFiles, !f.DisableFsync)
|
|
|
|
|
|
|
|
- l.Debugf("%v need file %s; copy %d, reused %v", f, file.Name, len(blocks), len(reused))
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Handling file", slogutil.FilePath(file.Name), "blocksToCopy", len(blocks), "reused", len(reused))
|
|
|
|
|
|
|
|
cs := copyBlocksState{
|
|
cs := copyBlocksState{
|
|
|
sharedPullerState: s,
|
|
sharedPullerState: s,
|
|
@@ -1181,15 +1181,15 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (f *sendReceiveFolder) reuseBlocks(blocks []protocol.BlockInfo, reused []int, file protocol.FileInfo, tempName string) ([]protocol.BlockInfo, []int) {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) reuseBlocks(ctx context.Context, blocks []protocol.BlockInfo, reused []int, file protocol.FileInfo, tempName string) ([]protocol.BlockInfo, []int) {
|
|
|
// Check for an old temporary file which might have some blocks we could
|
|
// Check for an old temporary file which might have some blocks we could
|
|
|
// reuse.
|
|
// reuse.
|
|
|
- tempBlocks, err := scanner.HashFile(f.ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
|
|
|
|
|
|
|
+ tempBlocks, err := scanner.HashFile(ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
var caseErr *fs.CaseConflictError
|
|
var caseErr *fs.CaseConflictError
|
|
|
if errors.As(err, &caseErr) {
|
|
if errors.As(err, &caseErr) {
|
|
|
if rerr := f.mtimefs.Rename(caseErr.Real, tempName); rerr == nil {
|
|
if rerr := f.mtimefs.Rename(caseErr.Real, tempName); rerr == nil {
|
|
|
- tempBlocks, err = scanner.HashFile(f.ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
|
|
|
|
|
|
|
+ tempBlocks, err = scanner.HashFile(ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -1262,7 +1262,7 @@ func populateOffsets(blocks []protocol.BlockInfo) {
|
|
|
// shortcutFile sets file metadata, when that's the only thing that has
|
|
// shortcutFile sets file metadata, when that's the only thing that has
|
|
|
// changed.
|
|
// changed.
|
|
|
func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
|
|
func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
|
|
|
- l.Debugln(f, "taking shortcut on", file.Name)
|
|
|
|
|
|
|
+ f.sl.Debug("Taking metadata shortcut", slogutil.FilePath(file.Name))
|
|
|
|
|
|
|
|
f.evLogger.Log(events.ItemStarted, map[string]string{
|
|
f.evLogger.Log(events.ItemStarted, map[string]string{
|
|
|
"folder": f.folderID,
|
|
"folder": f.folderID,
|
|
@@ -1330,7 +1330,7 @@ func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan ch
|
|
|
|
|
|
|
|
// copierRoutine reads copierStates until the in channel closes and performs
|
|
// copierRoutine reads copierStates until the in channel closes and performs
|
|
|
// the relevant copies when possible, or passes it to the puller routine.
|
|
// the relevant copies when possible, or passes it to the puller routine.
|
|
|
-func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) copierRoutine(ctx context.Context, in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
|
|
|
otherFolderFilesystems := make(map[string]fs.Filesystem)
|
|
otherFolderFilesystems := make(map[string]fs.Filesystem)
|
|
|
for folder, cfg := range f.model.cfg.Folders() {
|
|
for folder, cfg := range f.model.cfg.Folders() {
|
|
|
if folder == f.ID {
|
|
if folder == f.ID {
|
|
@@ -1349,8 +1349,8 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
|
|
|
blocks:
|
|
blocks:
|
|
|
for _, block := range state.blocks {
|
|
for _, block := range state.blocks {
|
|
|
select {
|
|
select {
|
|
|
- case <-f.ctx.Done():
|
|
|
|
|
- state.fail(fmt.Errorf("folder stopped: %w", f.ctx.Err()))
|
|
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ state.fail(fmt.Errorf("folder stopped: %w", ctx.Err()))
|
|
|
break blocks
|
|
break blocks
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
@@ -1368,7 +1368,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if f.copyBlock(block, state, otherFolderFilesystems) {
|
|
|
|
|
|
|
+ if f.copyBlock(ctx, block, state, otherFolderFilesystems) {
|
|
|
state.copyDone(block)
|
|
state.copyDone(block)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
@@ -1397,13 +1397,13 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Returns true when the block was successfully copied.
|
|
// Returns true when the block was successfully copied.
|
|
|
-func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem) bool {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) copyBlock(ctx context.Context, block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem) bool {
|
|
|
buf := protocol.BufferPool.Get(block.Size)
|
|
buf := protocol.BufferPool.Get(block.Size)
|
|
|
defer protocol.BufferPool.Put(buf)
|
|
defer protocol.BufferPool.Put(buf)
|
|
|
|
|
|
|
|
// Hope that it's usually in the same folder, so start with that
|
|
// Hope that it's usually in the same folder, so start with that
|
|
|
// one. Also possibly more efficient copy (same filesystem).
|
|
// one. Also possibly more efficient copy (same filesystem).
|
|
|
- if f.copyBlockFromFolder(f.ID, block, state, f.mtimefs, buf) {
|
|
|
|
|
|
|
+ if f.copyBlockFromFolder(ctx, f.ID, block, state, f.mtimefs, buf) {
|
|
|
return true
|
|
return true
|
|
|
}
|
|
}
|
|
|
if state.failed() != nil {
|
|
if state.failed() != nil {
|
|
@@ -1411,7 +1411,7 @@ func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocks
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
for folderID, ffs := range otherFolderFilesystems {
|
|
for folderID, ffs := range otherFolderFilesystems {
|
|
|
- if f.copyBlockFromFolder(folderID, block, state, ffs, buf) {
|
|
|
|
|
|
|
+ if f.copyBlockFromFolder(ctx, folderID, block, state, ffs, buf) {
|
|
|
return true
|
|
return true
|
|
|
}
|
|
}
|
|
|
if state.failed() != nil {
|
|
if state.failed() != nil {
|
|
@@ -1424,17 +1424,17 @@ func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocks
|
|
|
|
|
|
|
|
// Returns true when the block was successfully copied.
|
|
// Returns true when the block was successfully copied.
|
|
|
// The passed buffer must be large enough to accommodate the block.
|
|
// The passed buffer must be large enough to accommodate the block.
|
|
|
-func (f *sendReceiveFolder) copyBlockFromFolder(folderID string, block protocol.BlockInfo, state copyBlocksState, ffs fs.Filesystem, buf []byte) bool {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) copyBlockFromFolder(ctx context.Context, folderID string, block protocol.BlockInfo, state copyBlocksState, ffs fs.Filesystem, buf []byte) bool {
|
|
|
for e, err := range itererr.Zip(f.model.sdb.AllLocalBlocksWithHash(folderID, block.Hash)) {
|
|
for e, err := range itererr.Zip(f.model.sdb.AllLocalBlocksWithHash(folderID, block.Hash)) {
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
// We just ignore this and continue pulling instead (though
|
|
// We just ignore this and continue pulling instead (though
|
|
|
// there's a good chance that will fail too, if the DB is
|
|
// there's a good chance that will fail too, if the DB is
|
|
|
// unhealthy).
|
|
// unhealthy).
|
|
|
- l.Debugf("Failed to get information from DB about block %v in copier (folderID %v, file %v): %v", block.Hash, f.folderID, state.file.Name, err)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Failed to get block information from database", "blockHash", block.Hash, slogutil.FilePath(state.file.Name), slogutil.Error(err))
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if !f.copyBlockFromFile(e.FileName, e.Offset, state, ffs, block, buf) {
|
|
|
|
|
|
|
+ if !f.copyBlockFromFile(ctx, e.FileName, e.Offset, state, ffs, block, buf) {
|
|
|
if state.failed() != nil {
|
|
if state.failed() != nil {
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
@@ -1454,17 +1454,17 @@ func (f *sendReceiveFolder) copyBlockFromFolder(folderID string, block protocol.
|
|
|
|
|
|
|
|
// Returns true when the block was successfully copied.
|
|
// Returns true when the block was successfully copied.
|
|
|
// The passed buffer must be large enough to accommodate the block.
|
|
// The passed buffer must be large enough to accommodate the block.
|
|
|
-func (f *sendReceiveFolder) copyBlockFromFile(srcName string, srcOffset int64, state copyBlocksState, ffs fs.Filesystem, block protocol.BlockInfo, buf []byte) bool {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) copyBlockFromFile(ctx context.Context, srcName string, srcOffset int64, state copyBlocksState, ffs fs.Filesystem, block protocol.BlockInfo, buf []byte) bool {
|
|
|
fd, err := ffs.Open(srcName)
|
|
fd, err := ffs.Open(srcName)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- l.Debugf("Failed to open file %v trying to copy block %v (folderID %v): %v", srcName, block.Hash, f.folderID, err)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Failed to open source file for block copy", slogutil.FilePath(srcName), "blockHash", block.Hash, slogutil.Error(err))
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
|
defer fd.Close()
|
|
defer fd.Close()
|
|
|
|
|
|
|
|
_, err = fd.ReadAt(buf, srcOffset)
|
|
_, err = fd.ReadAt(buf, srcOffset)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- l.Debugf("Failed to read block from file %v in copier (folderID: %v, hash: %v): %v", srcName, f.folderID, block.Hash, err)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Failed to read block from file", slogutil.FilePath(srcName), "blockHash", block.Hash, slogutil.Error(err))
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1473,7 +1473,7 @@ func (f *sendReceiveFolder) copyBlockFromFile(srcName string, srcOffset int64, s
|
|
|
// trust. (The other side can and will verify.)
|
|
// trust. (The other side can and will verify.)
|
|
|
if f.Type != config.FolderTypeReceiveEncrypted {
|
|
if f.Type != config.FolderTypeReceiveEncrypted {
|
|
|
if err := f.verifyBuffer(buf, block); err != nil {
|
|
if err := f.verifyBuffer(buf, block); err != nil {
|
|
|
- l.Debugf("Failed to verify buffer in copier (folderID: %v): %v", f.folderID, err)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Failed to verify block buffer", slogutil.Error(err))
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -1485,13 +1485,13 @@ func (f *sendReceiveFolder) copyBlockFromFile(srcName string, srcOffset int64, s
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if f.CopyRangeMethod != config.CopyRangeMethodStandard {
|
|
if f.CopyRangeMethod != config.CopyRangeMethodStandard {
|
|
|
- err = f.withLimiter(func() error {
|
|
|
|
|
|
|
+ err = f.withLimiter(ctx, func() error {
|
|
|
dstFd.mut.Lock()
|
|
dstFd.mut.Lock()
|
|
|
defer dstFd.mut.Unlock()
|
|
defer dstFd.mut.Unlock()
|
|
|
return fs.CopyRange(f.CopyRangeMethod.ToFS(), fd, dstFd.fd, srcOffset, block.Offset, int64(block.Size))
|
|
return fs.CopyRange(f.CopyRangeMethod.ToFS(), fd, dstFd.fd, srcOffset, block.Offset, int64(block.Size))
|
|
|
})
|
|
})
|
|
|
} else {
|
|
} else {
|
|
|
- err = f.limitedWriteAt(dstFd, buf, block.Offset)
|
|
|
|
|
|
|
+ err = f.limitedWriteAt(ctx, dstFd, buf, block.Offset)
|
|
|
}
|
|
}
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
state.fail(fmt.Errorf("dst write: %w", err))
|
|
state.fail(fmt.Errorf("dst write: %w", err))
|
|
@@ -1513,7 +1513,7 @@ func (*sendReceiveFolder) verifyBuffer(buf []byte, block protocol.BlockInfo) err
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) pullerRoutine(ctx context.Context, in <-chan pullBlockState, out chan<- *sharedPullerState) {
|
|
|
requestLimiter := semaphore.New(f.PullerMaxPendingKiB * 1024)
|
|
requestLimiter := semaphore.New(f.PullerMaxPendingKiB * 1024)
|
|
|
var wg sync.WaitGroup
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
|
@@ -1531,7 +1531,7 @@ func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *
|
|
|
|
|
|
|
|
bytes := state.block.Size
|
|
bytes := state.block.Size
|
|
|
|
|
|
|
|
- if err := requestLimiter.TakeWithContext(f.ctx, bytes); err != nil {
|
|
|
|
|
|
|
+ if err := requestLimiter.TakeWithContext(ctx, bytes); err != nil {
|
|
|
state.fail(err)
|
|
state.fail(err)
|
|
|
out <- state.sharedPullerState
|
|
out <- state.sharedPullerState
|
|
|
continue
|
|
continue
|
|
@@ -1543,13 +1543,13 @@ func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *
|
|
|
defer wg.Done()
|
|
defer wg.Done()
|
|
|
defer requestLimiter.Give(bytes)
|
|
defer requestLimiter.Give(bytes)
|
|
|
|
|
|
|
|
- f.pullBlock(state, out)
|
|
|
|
|
|
|
+ f.pullBlock(ctx, state, out)
|
|
|
}()
|
|
}()
|
|
|
}
|
|
}
|
|
|
wg.Wait()
|
|
wg.Wait()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPullerState) {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) pullBlock(ctx context.Context, state pullBlockState, out chan<- *sharedPullerState) {
|
|
|
// Get an fd to the temporary file. Technically we don't need it until
|
|
// Get an fd to the temporary file. Technically we don't need it until
|
|
|
// after fetching the block, but if we run into an error here there is
|
|
// after fetching the block, but if we run into an error here there is
|
|
|
// no point in issuing the request to the network.
|
|
// no point in issuing the request to the network.
|
|
@@ -1572,8 +1572,8 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu
|
|
|
loop:
|
|
loop:
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
|
- case <-f.ctx.Done():
|
|
|
|
|
- state.fail(fmt.Errorf("folder stopped: %w", f.ctx.Err()))
|
|
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ state.fail(fmt.Errorf("folder stopped: %w", ctx.Err()))
|
|
|
break loop
|
|
break loop
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
@@ -1600,10 +1600,10 @@ loop:
|
|
|
activity.using(selected)
|
|
activity.using(selected)
|
|
|
var buf []byte
|
|
var buf []byte
|
|
|
blockNo := int(state.block.Offset / int64(state.file.BlockSize()))
|
|
blockNo := int(state.block.Offset / int64(state.file.BlockSize()))
|
|
|
- buf, lastError = f.model.RequestGlobal(f.ctx, selected.ID, f.folderID, state.file.Name, blockNo, state.block.Offset, state.block.Size, state.block.Hash, selected.FromTemporary)
|
|
|
|
|
|
|
+ buf, lastError = f.model.RequestGlobal(ctx, selected.ID, f.folderID, state.file.Name, blockNo, state.block.Offset, state.block.Size, state.block.Hash, selected.FromTemporary)
|
|
|
activity.done(selected)
|
|
activity.done(selected)
|
|
|
if lastError != nil {
|
|
if lastError != nil {
|
|
|
- l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, selected.ID.Short(), "returned error:", lastError)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Block request returned error", slogutil.FilePath(state.file.Name), "offset", state.block.Offset, "size", state.block.Size, "device", selected.ID.Short(), slogutil.Error(lastError))
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1617,12 +1617,12 @@ loop:
|
|
|
lastError = f.verifyBuffer(buf, state.block)
|
|
lastError = f.verifyBuffer(buf, state.block)
|
|
|
}
|
|
}
|
|
|
if lastError != nil {
|
|
if lastError != nil {
|
|
|
- l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Block hash mismatch", slogutil.FilePath(state.file.Name), "offset", state.block.Offset, "size", state.block.Size)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Save the block data we got from the cluster
|
|
// Save the block data we got from the cluster
|
|
|
- err = f.limitedWriteAt(fd, buf, state.block.Offset)
|
|
|
|
|
|
|
+ err = f.limitedWriteAt(ctx, fd, buf, state.block.Offset)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
state.fail(fmt.Errorf("save: %w", err))
|
|
state.fail(fmt.Errorf("save: %w", err))
|
|
|
} else {
|
|
} else {
|
|
@@ -1687,10 +1687,10 @@ func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCu
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) finisherRoutine(ctx context.Context, in <-chan *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
|
|
|
for state := range in {
|
|
for state := range in {
|
|
|
if closed, err := state.finalClose(); closed {
|
|
if closed, err := state.finalClose(); closed {
|
|
|
- l.Debugln(f, "closing", state.file.Name)
|
|
|
|
|
|
|
+ f.sl.DebugContext(ctx, "Closing temp file", slogutil.FilePath(state.file.Name))
|
|
|
|
|
|
|
|
f.queue.Done(state.file.Name)
|
|
f.queue.Done(state.file.Name)
|
|
|
|
|
|
|
@@ -1701,7 +1701,7 @@ func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState, dbUpda
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
f.newPullError(state.file.Name, fmt.Errorf("finishing: %w", err))
|
|
f.newPullError(state.file.Name, fmt.Errorf("finishing: %w", err))
|
|
|
} else {
|
|
} else {
|
|
|
- slog.Info("Synced file", f.LogAttr(), state.file.LogAttr(), slog.Group("blocks", slog.Int("local", state.reused+state.copyTotal), slog.Int("download", state.pullTotal)))
|
|
|
|
|
|
|
+ slog.InfoContext(ctx, "Synced file", f.LogAttr(), state.file.LogAttr(), slog.Group("blocks", slog.Int("local", state.reused+state.copyTotal), slog.Int("download", state.pullTotal)))
|
|
|
|
|
|
|
|
minBlocksPerBlock := state.file.BlockSize() / protocol.MinBlockSize
|
|
minBlocksPerBlock := state.file.BlockSize() / protocol.MinBlockSize
|
|
|
blockStatsMut.Lock()
|
|
blockStatsMut.Lock()
|
|
@@ -1756,11 +1756,11 @@ func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) {
|
|
|
if !f.DisableFsync {
|
|
if !f.DisableFsync {
|
|
|
fd, err := f.mtimefs.Open(dir)
|
|
fd, err := f.mtimefs.Open(dir)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- l.Debugf("fsync %q failed: %v", dir, err)
|
|
|
|
|
|
|
+ f.sl.Debug("Fsync failed", slogutil.FilePath(dir), slogutil.Error(err))
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
if err := fd.Sync(); err != nil {
|
|
if err := fd.Sync(); err != nil {
|
|
|
- l.Debugf("fsync %q failed: %v", dir, err)
|
|
|
|
|
|
|
+ f.sl.Debug("Fsync failed", slogutil.FilePath(dir), slogutil.Error(err))
|
|
|
}
|
|
}
|
|
|
fd.Close()
|
|
fd.Close()
|
|
|
}
|
|
}
|
|
@@ -1832,7 +1832,7 @@ loop:
|
|
|
|
|
|
|
|
// pullScannerRoutine aggregates paths to be scanned after pulling. The scan is
|
|
// pullScannerRoutine aggregates paths to be scanned after pulling. The scan is
|
|
|
// scheduled once when scanChan is closed (scanning can not happen during pulling).
|
|
// scheduled once when scanChan is closed (scanning can not happen during pulling).
|
|
|
-func (f *sendReceiveFolder) pullScannerRoutine(scanChan <-chan string) {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) pullScannerRoutine(ctx context.Context, scanChan <-chan string) {
|
|
|
toBeScanned := make(map[string]struct{})
|
|
toBeScanned := make(map[string]struct{})
|
|
|
|
|
|
|
|
for path := range scanChan {
|
|
for path := range scanChan {
|
|
@@ -1842,7 +1842,7 @@ func (f *sendReceiveFolder) pullScannerRoutine(scanChan <-chan string) {
|
|
|
if len(toBeScanned) != 0 {
|
|
if len(toBeScanned) != 0 {
|
|
|
scanList := make([]string, 0, len(toBeScanned))
|
|
scanList := make([]string, 0, len(toBeScanned))
|
|
|
for path := range toBeScanned {
|
|
for path := range toBeScanned {
|
|
|
- l.Debugln(f, "scheduling scan after pulling for", path)
|
|
|
|
|
|
|
+ slog.DebugContext(ctx, "Scheduling scan after pulling", slogutil.FilePath(path))
|
|
|
scanList = append(scanList, path)
|
|
scanList = append(scanList, path)
|
|
|
}
|
|
}
|
|
|
f.Scan(scanList)
|
|
f.Scan(scanList)
|
|
@@ -1883,7 +1883,7 @@ func (f *sendReceiveFolder) moveForConflict(name, lastModBy string, scanChan cha
|
|
|
})
|
|
})
|
|
|
for _, match := range matches[f.MaxConflicts:] {
|
|
for _, match := range matches[f.MaxConflicts:] {
|
|
|
if gerr := f.mtimefs.Remove(match); gerr != nil {
|
|
if gerr := f.mtimefs.Remove(match); gerr != nil {
|
|
|
- l.Debugln(f, "removing extra conflict", gerr)
|
|
|
|
|
|
|
+ f.sl.Debug("Failed to remove extra conflict copy", slogutil.Error(gerr))
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -1895,7 +1895,7 @@ func (f *sendReceiveFolder) moveForConflict(name, lastModBy string, scanChan cha
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (f *sendReceiveFolder) newPullError(path string, err error) {
|
|
func (f *sendReceiveFolder) newPullError(path string, err error) {
|
|
|
- if errors.Is(err, f.ctx.Err()) {
|
|
|
|
|
|
|
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
|
|
// Error because the folder stopped - no point logging/tracking
|
|
// Error because the folder stopped - no point logging/tracking
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
@@ -1916,7 +1916,7 @@ func (f *sendReceiveFolder) newPullError(path string, err error) {
|
|
|
errStr := fmt.Sprintf("syncing: %s", err)
|
|
errStr := fmt.Sprintf("syncing: %s", err)
|
|
|
f.tempPullErrors[path] = errStr
|
|
f.tempPullErrors[path] = errStr
|
|
|
|
|
|
|
|
- l.Debugf("%v new error for %v: %v", f, path, err)
|
|
|
|
|
|
|
+ f.sl.Debug("New pull error", slogutil.FilePath(path), slogutil.Error(err))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// deleteItemOnDisk deletes the file represented by old that is about to be replaced by new.
|
|
// deleteItemOnDisk deletes the file represented by old that is about to be replaced by new.
|
|
@@ -2116,7 +2116,7 @@ func (f *sendReceiveFolder) scanIfItemChanged(name string, stat fs.FileInfo, ite
|
|
|
// I.e. non-nil error status means "Do not delete!" or "is already deleted".
|
|
// I.e. non-nil error status means "Do not delete!" or "is already deleted".
|
|
|
func (f *sendReceiveFolder) checkToBeDeleted(file, cur protocol.FileInfo, hasCur bool, scanChan chan<- string) error {
|
|
func (f *sendReceiveFolder) checkToBeDeleted(file, cur protocol.FileInfo, hasCur bool, scanChan chan<- string) error {
|
|
|
if err := osutil.TraversesSymlink(f.mtimefs, filepath.Dir(file.Name)); err != nil {
|
|
if err := osutil.TraversesSymlink(f.mtimefs, filepath.Dir(file.Name)); err != nil {
|
|
|
- l.Debugln(f, "not deleting item behind symlink on disk, but update db", file.Name)
|
|
|
|
|
|
|
+ f.sl.Debug("Not deleting item behind symlink on disk, but updating database", slogutil.FilePath(file.Name))
|
|
|
return fs.ErrNotExist
|
|
return fs.ErrNotExist
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -2130,7 +2130,7 @@ func (f *sendReceiveFolder) checkToBeDeleted(file, cur protocol.FileInfo, hasCur
|
|
|
scanChan <- file.Name
|
|
scanChan <- file.Name
|
|
|
return errModified
|
|
return errModified
|
|
|
}
|
|
}
|
|
|
- l.Debugln(f, "not deleting item we don't have, but update db", file.Name)
|
|
|
|
|
|
|
+ f.sl.Debug("Not deleting item we don't have, but updating database", slogutil.FilePath(file.Name))
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -2144,7 +2144,7 @@ func (f *sendReceiveFolder) setPlatformData(file *protocol.FileInfo, name string
|
|
|
if f.SyncXattrs {
|
|
if f.SyncXattrs {
|
|
|
// Set extended attributes.
|
|
// Set extended attributes.
|
|
|
if err := f.mtimefs.SetXattr(name, file.Platform.Xattrs(), f.XattrFilter); errors.Is(err, fs.ErrXattrsNotSupported) {
|
|
if err := f.mtimefs.SetXattr(name, file.Platform.Xattrs(), f.XattrFilter); errors.Is(err, fs.ErrXattrsNotSupported) {
|
|
|
- l.Debugf("Cannot set xattrs on %q: %v", file.Name, err)
|
|
|
|
|
|
|
+ f.sl.Debug("Cannot set xattrs (not supported)", slogutil.FilePath(file.Name), slogutil.Error(err))
|
|
|
} else if err != nil {
|
|
} else if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
@@ -2185,15 +2185,15 @@ func (f *sendReceiveFolder) inWritableDir(fn func(string) error, path string) er
|
|
|
return inWritableDir(fn, f.mtimefs, path, f.IgnorePerms)
|
|
return inWritableDir(fn, f.mtimefs, path, f.IgnorePerms)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (f *sendReceiveFolder) limitedWriteAt(fd io.WriterAt, data []byte, offset int64) error {
|
|
|
|
|
- return f.withLimiter(func() error {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) limitedWriteAt(ctx context.Context, fd io.WriterAt, data []byte, offset int64) error {
|
|
|
|
|
+ return f.withLimiter(ctx, func() error {
|
|
|
_, err := fd.WriteAt(data, offset)
|
|
_, err := fd.WriteAt(data, offset)
|
|
|
return err
|
|
return err
|
|
|
})
|
|
})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (f *sendReceiveFolder) withLimiter(fn func() error) error {
|
|
|
|
|
- if err := f.writeLimiter.TakeWithContext(f.ctx, 1); err != nil {
|
|
|
|
|
|
|
+func (f *sendReceiveFolder) withLimiter(ctx context.Context, fn func() error) error {
|
|
|
|
|
+ if err := f.writeLimiter.TakeWithContext(ctx, 1); err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
defer f.writeLimiter.Give(1)
|
|
defer f.writeLimiter.Give(1)
|
|
@@ -2235,7 +2235,7 @@ func existingConflicts(name string, fs fs.Filesystem) []string {
|
|
|
ext := filepath.Ext(name)
|
|
ext := filepath.Ext(name)
|
|
|
matches, err := fs.Glob(name[:len(name)-len(ext)] + ".sync-conflict-????????-??????*" + ext)
|
|
matches, err := fs.Glob(name[:len(name)-len(ext)] + ".sync-conflict-????????-??????*" + ext)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- l.Debugln("globbing for conflicts", err)
|
|
|
|
|
|
|
+ slog.Debug("Globbing for conflicts failed", slogutil.Error(err))
|
|
|
}
|
|
}
|
|
|
return matches
|
|
return matches
|
|
|
}
|
|
}
|