Browse Source

chore(db): buffer pulled files for smaller WAL (#10036)

We can't hold a long select open while pulling.
Jakob Borg 6 tháng trước cách đây
mục cha
commit
fa3b9acca3
3 tập tin đã thay đổi với 20 bổ sung7 xóa
  1. 7 2
      internal/db/sqlite/db_update.go
  2. 12 4
      lib/model/folder_sendrecv.go
  3. 1 1
      lib/model/indexhandler.go

+ 7 - 2
internal/db/sqlite/db_update.go

@@ -577,7 +577,6 @@ func (s *DB) periodicCheckpointLocked(fs []protocol.FileInfo) {
 		s.updatePoints += len(f.Blocks) * updatePointsPerBlock
 	}
 	if s.updatePoints > updatePointsThreshold {
-		l.Debugln("checkpoint at", s.updatePoints)
 		conn, err := s.sql.Conn(context.Background())
 		if err != nil {
 			l.Debugln("conn:", err)
@@ -587,8 +586,14 @@ func (s *DB) periodicCheckpointLocked(fs []protocol.FileInfo) {
 		if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 67108864`); err != nil {
 			l.Debugln("PRAGMA journal_size_limit:", err)
 		}
-		if _, err := conn.ExecContext(context.Background(), `PRAGMA wal_checkpoint(RESTART)`); err != nil {
+		row := conn.QueryRowContext(context.Background(), `PRAGMA wal_checkpoint(RESTART)`)
+		var res, modified, moved int
+		if row.Err() != nil {
 			l.Debugln("PRAGMA wal_checkpoint(RESTART):", err)
+		} else if err := row.Scan(&res, &modified, &moved); err != nil {
+			l.Debugln("PRAGMA wal_checkpoint(RESTART) (scan):", err)
+		} else {
+			l.Debugln("checkpoint at", s.updatePoints, "returned", res, modified, moved)
 		}
 		s.updatePoints = 0
 	}

+ 12 - 4
lib/model/folder_sendrecv.go

@@ -315,15 +315,23 @@ func (f *sendReceiveFolder) processNeeded(dbUpdateChan chan<- dbUpdateJob, copyC
 	fileDeletions := map[string]protocol.FileInfo{}
 	buckets := map[string][]protocol.FileInfo{}
 
+	// Buffer the full list of needed files. This is somewhat wasteful and
+	// uses a lot of memory, but we need to keep the duration of the
+	// database read short and not do a bunch of file and data I/O inside
+	// the loop. If we forego the ability for users to repriorize the pull
+	// queue on the fly we could do this in batches, though that would also
+	// be a bit slower and less efficient in other ways.
+	files, err := itererr.Collect(f.model.sdb.AllNeededGlobalFiles(f.folderID, protocol.LocalDeviceID, f.Order, 0, 0))
+	if err != nil {
+		return changed, nil, nil, err
+	}
+
 	// Iterate the list of items that we need and sort them into piles.
 	// Regular files to pull goes into the file queue, everything else
 	// (directories, symlinks and deletes) goes into the "process directly"
 	// pile.
 loop:
-	for file, err := range itererr.Zip(f.model.sdb.AllNeededGlobalFiles(f.folderID, protocol.LocalDeviceID, f.Order, 0, 0)) {
-		if err != nil {
-			return changed, nil, nil, err
-		}
+	for _, file := range files {
 		select {
 		case <-f.ctx.Done():
 			break loop

+ 1 - 1
lib/model/indexhandler.go

@@ -295,7 +295,7 @@ func (s *indexHandler) sendIndexTo(ctx context.Context) error {
 	var f protocol.FileInfo
 	previousWasDelete := false
 
-	for fi, err := range itererr.Zip(s.sdb.AllLocalFilesBySequence(s.folder, protocol.LocalDeviceID, s.localPrevSequence+1, 5000)) {
+	for fi, err := range itererr.Zip(s.sdb.AllLocalFilesBySequence(s.folder, protocol.LocalDeviceID, s.localPrevSequence+1, MaxBatchSizeFiles+1)) {
 		if err != nil {
 			return err
 		}