Browse Source

chore(model): refactor copier for more flatness (#10094)

Flattened the copier code more. Also removing and moving some
parameters/return values to simplify things. Generally rely less on
return values, e.g. by handling errors right away and using `state` to
do the right thing (e.g. abort on failure).

Supposed to be a refactor without any behaviour changes, except for
fixing a tiny regression on folder order: We used to try copying from
the same folder first, but lost that property at some point (also sent a
PR fixing only that, I'd merge that first making this refactor only).
Simon Frei 6 months ago
parent
commit
821d6f43ac
2 changed files with 106 additions and 67 deletions
  1. 94 67
      lib/model/folder_sendrecv.go
  2. 12 0
      lib/model/folder_sendrecv_test.go

+ 94 - 67
lib/model/folder_sendrecv.go

@@ -1291,14 +1291,12 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
 		protocol.BufferPool.Put(buf)
 	}()
 
-	folderFilesystems := make(map[string]fs.Filesystem)
-	// Hope that it's usually in the same folder, so start with that one.
-	folders := []string{f.folderID}
+	otherFolderFilesystems := make(map[string]fs.Filesystem)
 	for folder, cfg := range f.model.cfg.Folders() {
-		folderFilesystems[folder] = cfg.Filesystem()
-		if folder != f.folderID {
-			folders = append(folders, folder)
+		if folder == f.ID {
+			continue
 		}
+		otherFolderFilesystems[folder] = cfg.Filesystem()
 	}
 
 	for state := range in {
@@ -1309,13 +1307,6 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
 			continue
 		}
 
-		dstFd, err := state.tempFile()
-		if err != nil {
-			// Nothing more to do for this failed file, since we couldn't create a temporary for it.
-			out <- state.sharedPullerState
-			continue
-		}
-
 		if f.Type != config.FolderTypeReceiveEncrypted {
 			f.model.progressEmitter.Register(state.sharedPullerState)
 		}
@@ -1342,53 +1333,27 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
 				continue
 			}
 
-			buf = protocol.BufferPool.Upgrade(buf, int(block.Size))
-			copied := false
-
-		folders:
-			// Intentionally not iterating over `folderFilesystems` directly,
-			// to preserve order (same folder first).
-			for _, folderID := range folders {
-				ffs := folderFilesystems[folderID]
-				for e, err := range itererr.Zip(f.model.sdb.AllLocalBlocksWithHash(folderID, block.Hash)) {
-					if err != nil {
-						// We just ignore this and continue pulling instead (though
-						// there's a good chance that will fail too, if the DB is
-						// unhealthy).
-						l.Debugf("Failed to get information from DB about block %v in copier (folderID %v, file %v): %v", block.Hash, f.folderID, state.file.Name)
-						break
-					}
-
-					copied, err = f.copyBlock(e.FileName, e.Offset, dstFd, ffs, block, buf)
-					if err != nil {
-						state.fail(err)
-						break folders
-					}
-					if !copied {
-						continue
-					}
-					if e.FileName == state.file.Name {
-						state.copiedFromOrigin(block.Size)
-					} else {
-						state.copiedFromElsewhere(block.Size)
-					}
-					break folders
-				}
+			if f.copyBlock(block, state, otherFolderFilesystems, buf) {
+				state.copyDone(block)
+				continue
 			}
 
 			if state.failed() != nil {
 				break
 			}
 
-			if !copied {
-				state.pullStarted()
-				ps := pullBlockState{
-					sharedPullerState: state.sharedPullerState,
-					block:             block,
-				}
-				pullChan <- ps
-			} else {
-				state.copyDone(block)
+			state.pullStarted()
+			ps := pullBlockState{
+				sharedPullerState: state.sharedPullerState,
+				block:             block,
+			}
+			pullChan <- ps
+		}
+		// If there are no blocks to pull/copy, we still need the temporary file in place.
+		if len(state.blocks) == 0 {
+			_, err := state.tempFile()
+			if err != nil {
+				state.fail(err)
 			}
 		}
 
@@ -1396,25 +1361,75 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
 	}
 }
 
-// Returns true, if the block was successfully copied. The returned
-// errors is *only* non-nil for errors that are fatal for pulling this entire
-// file (i.e. writing to dstFd fails). For other errors that mean the block
-// wasn't copied, but we can continue trying, the error is nil and the bool
-// false.
-// The buffer must be big enough to hold the block. It's only purpose is
-// efficiency resp. reuse, the caller must not use/rely on its contents.
-func (f *sendReceiveFolder) copyBlock(srcName string, srcOffset int64, dstFd *lockedWriterAt, ffs fs.Filesystem, block protocol.BlockInfo, buf []byte) (bool, error) {
+// Returns true when the block was successfully copied.
+func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem, buf []byte) bool {
+	buf = protocol.BufferPool.Upgrade(buf, int(block.Size))
+
+	// Hope that it's usually in the same folder, so start with that
+	// one. Also possibly more efficient copy (same filesystem).
+	if f.copyBlockFromFolder(f.ID, block, state, f.mtimefs, buf) {
+		return true
+	}
+	if state.failed() != nil {
+		return false
+	}
+
+	for folderID, ffs := range otherFolderFilesystems {
+		if f.copyBlockFromFolder(folderID, block, state, ffs, buf) {
+			return true
+		}
+		if state.failed() != nil {
+			return false
+		}
+	}
+
+	return false
+}
+
+// Returns true when the block was successfully copied.
+// The passed buffer must be large enough to accommodate the block.
+func (f *sendReceiveFolder) copyBlockFromFolder(folderID string, block protocol.BlockInfo, state copyBlocksState, ffs fs.Filesystem, buf []byte) bool {
+	for e, err := range itererr.Zip(f.model.sdb.AllLocalBlocksWithHash(folderID, block.Hash)) {
+		if err != nil {
+			// We just ignore this and continue pulling instead (though
+			// there's a good chance that will fail too, if the DB is
+			// unhealthy).
+			l.Debugf("Failed to get information from DB about block %v in copier (folderID %v, file %v): %v", block.Hash, f.folderID, state.file.Name)
+			return false
+		}
+
+		if !f.copyBlockFromFile(e.FileName, e.Offset, state, ffs, block, buf) {
+			if state.failed() != nil {
+				return false
+			}
+			continue
+		}
+
+		if e.FileName == state.file.Name {
+			state.copiedFromOrigin(block.Size)
+		} else {
+			state.copiedFromElsewhere(block.Size)
+		}
+		return true
+	}
+
+	return false
+}
+
+// Returns true when the block was successfully copied.
+// The passed buffer must be large enough to accommodate the block.
+func (f *sendReceiveFolder) copyBlockFromFile(srcName string, srcOffset int64, state copyBlocksState, ffs fs.Filesystem, block protocol.BlockInfo, buf []byte) bool {
 	fd, err := ffs.Open(srcName)
 	if err != nil {
 		l.Debugf("Failed to open file %v trying to copy block %v (folderID %v): %v", srcName, block.Hash, f.folderID, err)
-		return false, nil
+		return false
 	}
 	defer fd.Close()
 
 	_, err = fd.ReadAt(buf, srcOffset)
 	if err != nil {
 		l.Debugf("Failed to read block from file %v in copier (folderID: %v, hash: %v): %v", srcName, f.folderID, block.Hash, err)
-		return false, nil
+		return false
 	}
 
 	// Hash is not SHA256 as it's an encrypted hash token. In that
@@ -1423,18 +1438,30 @@ func (f *sendReceiveFolder) copyBlock(srcName string, srcOffset int64, dstFd *lo
 	if f.Type != config.FolderTypeReceiveEncrypted {
 		if err := f.verifyBuffer(buf, block); err != nil {
 			l.Debugf("Failed to verify buffer in copier (folderID: %v): %v", f.folderID, err)
-			return false, nil
+			return false
 		}
 	}
 
+	dstFd, err := state.tempFile()
+	if err != nil {
+		// State is already marked as failed when an error is returned here.
+		return false
+	}
+
 	if f.CopyRangeMethod != config.CopyRangeMethodStandard {
-		return true, f.withLimiter(func() error {
+		err = f.withLimiter(func() error {
 			dstFd.mut.Lock()
 			defer dstFd.mut.Unlock()
 			return fs.CopyRange(f.CopyRangeMethod.ToFS(), fd, dstFd.fd, srcOffset, block.Offset, int64(block.Size))
 		})
+	} else {
+		err = f.limitedWriteAt(dstFd, buf, block.Offset)
 	}
-	return true, f.limitedWriteAt(dstFd, buf, block.Offset)
+	if err != nil {
+		state.fail(fmt.Errorf("dst write: %w", err))
+		return false
+	}
+	return true
 }
 
 func (*sendReceiveFolder) verifyBuffer(buf []byte, block protocol.BlockInfo) error {

+ 12 - 0
lib/model/folder_sendrecv_test.go

@@ -14,6 +14,7 @@ import (
 	"io"
 	"os"
 	"path/filepath"
+	"runtime/pprof"
 	"strconv"
 	"strings"
 	"testing"
@@ -680,6 +681,17 @@ func TestCopyOwner(t *testing.T) {
 		expGroup = 5678
 	)
 
+	// This test hung on a regression, taking a long time to fail - speed that up.
+	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+	defer cancel()
+	go func() {
+		<-ctx.Done()
+		if errors.Is(ctx.Err(), context.DeadlineExceeded) {
+			pprof.Lookup("goroutine").WriteTo(os.Stdout, 2)
+			panic("timed out before test finished")
+		}
+	}()
+
 	// Set up a folder with the CopyParentOwner bit and backed by a fake
 	// filesystem.