Browse Source

chore(db, model): simplify per hash DB lookup in copier (#10080)

This is a draft because I haven't adjusted all the tests yet, I'd like
to get feedback on the change overall first, before spending time on
that.

In my opinion the main win of this change is in it's lower complexity
resp. fewer moving parts. It should also be faster as it only does one
query instead of two, but I have no idea if that's practically
relevant.

This also mirrors the v1 DB, where a block map key had the name
appended. Not that this is an argument for the change, it was mostly
reassuring me that I might not be missing something key here
conceptually (I might still be of course, please tell me :) ).

And the change isn't mainly intrinsically motivated, instead it came
up while fixing a bug in the copier. And the nested nature of that code
makes the fix harder, and "un-nesting" it required me to understand
what's happening. This change fell out of that.
Simon Frei 9 months ago
parent
commit
6b94599467

+ 2 - 2
internal/db/interface.go

@@ -41,8 +41,7 @@ type DB interface {
 	AllLocalFilesWithPrefix(folder string, device protocol.DeviceID, prefix string) (iter.Seq[protocol.FileInfo], func() error)
 	AllLocalFilesWithBlocksHash(folder string, h []byte) (iter.Seq[FileMetadata], func() error)
 	AllNeededGlobalFiles(folder string, device protocol.DeviceID, order config.PullOrder, limit, offset int) (iter.Seq[protocol.FileInfo], func() error)
-	AllLocalBlocksWithHash(hash []byte) ([]BlockMapEntry, error)
-	AllLocalFilesWithBlocksHashAnyFolder(hash []byte) (map[string][]FileMetadata, error)
+	AllLocalBlocksWithHash(folder string, hash []byte) (iter.Seq[BlockMapEntry], func() error)
 
 	// Cleanup
 	DropAllFiles(folder string, device protocol.DeviceID) error
@@ -88,6 +87,7 @@ type BlockMapEntry struct {
 	Offset        int64
 	BlockIndex    int
 	Size          int
+	FileName      string
 }
 
 type KeyValue struct {

+ 2 - 7
internal/db/metrics.go

@@ -67,11 +67,6 @@ func (m metricsDB) AllLocalFilesWithBlocksHash(folder string, h []byte) (iter.Se
 	return m.DB.AllLocalFilesWithBlocksHash(folder, h)
 }
 
-func (m metricsDB) AllLocalFilesWithBlocksHashAnyFolder(hash []byte) (map[string][]FileMetadata, error) {
-	defer m.account("-", "AllLocalFilesWithBlocksHashAnyFolder")()
-	return m.DB.AllLocalFilesWithBlocksHashAnyFolder(hash)
-}
-
 func (m metricsDB) AllGlobalFiles(folder string) (iter.Seq[FileMetadata], func() error) {
 	defer m.account(folder, "AllGlobalFiles")()
 	return m.DB.AllGlobalFiles(folder)
@@ -107,9 +102,9 @@ func (m metricsDB) GetGlobalAvailability(folder, file string) ([]protocol.Device
 	return m.DB.GetGlobalAvailability(folder, file)
 }
 
-func (m metricsDB) AllLocalBlocksWithHash(hash []byte) ([]BlockMapEntry, error) {
+func (m metricsDB) AllLocalBlocksWithHash(folder string, hash []byte) (iter.Seq[BlockMapEntry], func() error) {
 	defer m.account("-", "AllLocalBlocksWithHash")()
-	return m.DB.AllLocalBlocksWithHash(hash)
+	return m.DB.AllLocalBlocksWithHash(folder, hash)
 }
 
 func (m metricsDB) Close() error {

+ 9 - 19
internal/db/sqlite/db_folderdb.go

@@ -16,7 +16,6 @@ import (
 	"time"
 
 	"github.com/syncthing/syncthing/internal/db"
-	"github.com/syncthing/syncthing/internal/itererr"
 	"github.com/syncthing/syncthing/lib/config"
 	"github.com/syncthing/syncthing/lib/protocol"
 	"github.com/syncthing/syncthing/lib/rand"
@@ -154,24 +153,15 @@ func (s *DB) AllGlobalFilesPrefix(folder string, prefix string) (iter.Seq[db.Fil
 	return fdb.AllGlobalFilesPrefix(prefix)
 }
 
-func (s *DB) AllLocalBlocksWithHash(hash []byte) ([]db.BlockMapEntry, error) {
-	var entries []db.BlockMapEntry
-	err := s.forEachFolder(func(fdb *folderDB) error {
-		es, err := itererr.Collect(fdb.AllLocalBlocksWithHash(hash))
-		entries = append(entries, es...)
-		return err
-	})
-	return entries, err
-}
-
-func (s *DB) AllLocalFilesWithBlocksHashAnyFolder(hash []byte) (map[string][]db.FileMetadata, error) {
-	res := make(map[string][]db.FileMetadata)
-	err := s.forEachFolder(func(fdb *folderDB) error {
-		files, err := itererr.Collect(fdb.AllLocalFilesWithBlocksHash(hash))
-		res[fdb.folderID] = files
-		return err
-	})
-	return res, err
+func (s *DB) AllLocalBlocksWithHash(folder string, hash []byte) (iter.Seq[db.BlockMapEntry], func() error) {
+	fdb, err := s.getFolderDB(folder, false)
+	if errors.Is(err, errNoSuchFolder) {
+		return func(yield func(db.BlockMapEntry) bool) {}, func() error { return nil }
+	}
+	if err != nil {
+		return func(yield func(db.BlockMapEntry) bool) {}, func() error { return err }
+	}
+	return fdb.AllLocalBlocksWithHash(hash)
 }
 
 func (s *DB) AllLocalFiles(folder string, device protocol.DeviceID) (iter.Seq[protocol.FileInfo], func() error) {

+ 7 - 19
internal/db/sqlite/db_local_test.go

@@ -9,6 +9,7 @@ package sqlite
 import (
 	"testing"
 
+	"github.com/syncthing/syncthing/internal/itererr"
 	"github.com/syncthing/syncthing/lib/protocol"
 )
 
@@ -50,7 +51,7 @@ func TestBlocks(t *testing.T) {
 
 	// Search for blocks
 
-	vals, err := db.AllLocalBlocksWithHash([]byte{1, 2, 3})
+	vals, err := itererr.Collect(db.AllLocalBlocksWithHash(folderID, []byte{1, 2, 3}))
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -61,26 +62,13 @@ func TestBlocks(t *testing.T) {
 		t.Log(vals[0])
 		t.Fatal("bad entry")
 	}
-
-	// Get FileInfos for those blocks
-
-	res, err := db.AllLocalFilesWithBlocksHashAnyFolder(vals[0].BlocklistHash)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if len(res) != 1 {
-		t.Fatal("should return one folder")
-	}
-	if len(res[folderID]) != 1 {
-		t.Fatal("should find one file")
-	}
-	if res[folderID][0].Name != "file1" {
+	if vals[0].FileName != "file1" {
 		t.Fatal("should be file1")
 	}
 
 	// Get the other blocks
 
-	vals, err = db.AllLocalBlocksWithHash([]byte{3, 4, 5})
+	vals, err = itererr.Collect(db.AllLocalBlocksWithHash(folderID, []byte{3, 4, 5}))
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -119,7 +107,7 @@ func TestBlocksDeleted(t *testing.T) {
 
 	// We should find one entry for the block hash
 	search := file.Blocks[0].Hash
-	es, err := sdb.AllLocalBlocksWithHash(search)
+	es, err := itererr.Collect(sdb.AllLocalBlocksWithHash(folderID, search))
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -134,7 +122,7 @@ func TestBlocksDeleted(t *testing.T) {
 	}
 
 	// Searching for the old hash should yield no hits
-	if hits, err := sdb.AllLocalBlocksWithHash(search); err != nil {
+	if hits, err := itererr.Collect(sdb.AllLocalBlocksWithHash(folderID, search)); err != nil {
 		t.Fatal(err)
 	} else if len(hits) != 0 {
 		t.Log(hits)
@@ -142,7 +130,7 @@ func TestBlocksDeleted(t *testing.T) {
 	}
 
 	// Searching for the new hash should yield one hits
-	if hits, err := sdb.AllLocalBlocksWithHash(file.Blocks[0].Hash); err != nil {
+	if hits, err := itererr.Collect(sdb.AllLocalBlocksWithHash(folderID, file.Blocks[0].Hash)); err != nil {
 		t.Fatal(err)
 	} else if len(hits) != 1 {
 		t.Log(hits)

+ 1 - 1
internal/db/sqlite/db_test.go

@@ -1084,7 +1084,7 @@ func TestInsertLargeFile(t *testing.T) {
 	// Verify all the blocks are here
 
 	for i, block := range files[0].Blocks {
-		bs, err := sdb.AllLocalBlocksWithHash(block.Hash)
+		bs, err := itererr.Collect(sdb.AllLocalBlocksWithHash(folderID, block.Hash))
 		if err != nil {
 			t.Fatal(err)
 		}

+ 1 - 1
internal/db/sqlite/folderdb_local.go

@@ -99,7 +99,7 @@ func (s *folderDB) AllLocalBlocksWithHash(hash []byte) (iter.Seq[db.BlockMapEntr
 	// & blocklists is deferred (garbage collected) while the files list is
 	// not. This filters out blocks that are in fact deleted.
 	return iterStructs[db.BlockMapEntry](s.stmt(`
-		SELECT f.blocklist_hash as blocklisthash, b.idx as blockindex, b.offset, b.size FROM files f
+		SELECT f.blocklist_hash as blocklisthash, b.idx as blockindex, b.offset, b.size, f.name as filename FROM files f
 		LEFT JOIN blocks b ON f.blocklist_hash = b.blocklist_hash
 		WHERE f.device_idx = {{.LocalDeviceIdx}} AND b.hash = ?
 	`).Queryx(hash))

+ 25 - 25
lib/model/folder_sendrecv.go

@@ -1343,33 +1343,33 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
 			}
 
 			buf = protocol.BufferPool.Upgrade(buf, int(block.Size))
-
 			copied := false
-			blocks, _ := f.model.sdb.AllLocalBlocksWithHash(block.Hash)
-		innerBlocks:
-			for _, e := range blocks {
-				res, err := f.model.sdb.AllLocalFilesWithBlocksHashAnyFolder(e.BlocklistHash)
-				if err != nil {
-					continue
-				}
-				for folderID, files := range res {
-					ffs := folderFilesystems[folderID]
-					for _, fi := range files {
-						copied, err = f.copyBlock(fi.Name, e.Offset, dstFd, ffs, block, buf)
-						if err != nil {
-							state.fail(err)
-							break innerBlocks
-						}
-						if !copied {
-							continue
-						}
-						if fi.Name == state.file.Name {
-							state.copiedFromOrigin(block.Size)
-						} else {
-							state.copiedFromElsewhere(block.Size)
-						}
-						break innerBlocks
+
+		folders:
+			for folderID, ffs := range folderFilesystems {
+				for e, err := range itererr.Zip(f.model.sdb.AllLocalBlocksWithHash(folderID, block.Hash)) {
+					if err != nil {
+						// We just ignore this and continue pulling instead (though
+						// there's a good chance that will fail too, if the DB is
+						// unhealthy).
+						l.Debugf("Failed to get information from DB about block %v in copier (folderID %v, file %v): %v", block.Hash, f.folderID, state.file.Name)
+						break
+					}
+
+					copied, err = f.copyBlock(e.FileName, e.Offset, dstFd, ffs, block, buf)
+					if err != nil {
+						state.fail(err)
+						break folders
+					}
+					if !copied {
+						continue
+					}
+					if e.FileName == state.file.Name {
+						state.copiedFromOrigin(block.Size)
+					} else {
+						state.copiedFromElsewhere(block.Size)
 					}
+					break folders
 				}
 			}
 

+ 5 - 4
lib/model/folder_sendrecv_test.go

@@ -19,6 +19,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/syncthing/syncthing/internal/itererr"
 	"github.com/syncthing/syncthing/lib/build"
 	"github.com/syncthing/syncthing/lib/config"
 	"github.com/syncthing/syncthing/lib/events"
@@ -325,11 +326,11 @@ func TestCopierCleanup(t *testing.T) {
 	// Update index (removing old blocks)
 	f.updateLocalsFromScanning([]protocol.FileInfo{file})
 
-	if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[0].Hash); err != nil || len(vals) > 0 {
+	if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(f.ID, blocks[0].Hash)); err != nil || len(vals) > 0 {
 		t.Error("Unexpected block found")
 	}
 
-	if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[1].Hash); err != nil || len(vals) == 0 {
+	if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(f.ID, blocks[1].Hash)); err != nil || len(vals) == 0 {
 		t.Error("Expected block not found")
 	}
 
@@ -338,11 +339,11 @@ func TestCopierCleanup(t *testing.T) {
 	// Update index (removing old blocks)
 	f.updateLocalsFromScanning([]protocol.FileInfo{file})
 
-	if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[0].Hash); err != nil || len(vals) == 0 {
+	if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(f.ID, blocks[0].Hash)); err != nil || len(vals) == 0 {
 		t.Error("Unexpected block found")
 	}
 
-	if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[1].Hash); err != nil || len(vals) > 0 {
+	if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(f.ID, blocks[1].Hash)); err != nil || len(vals) > 0 {
 		t.Error("Expected block not found")
 	}
 }