| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569 | 
							- // Copyright (C) 2025 The Syncthing Authors.
 
- //
 
- // This Source Code Form is subject to the terms of the Mozilla Public
 
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
 
- // You can obtain one at https://mozilla.org/MPL/2.0/.
 
- package sqlite
 
- import (
 
- 	"cmp"
 
- 	"context"
 
- 	"fmt"
 
- 	"log/slog"
 
- 	"slices"
 
- 	"github.com/jmoiron/sqlx"
 
- 	"github.com/syncthing/syncthing/internal/gen/dbproto"
 
- 	"github.com/syncthing/syncthing/internal/itererr"
 
- 	"github.com/syncthing/syncthing/internal/slogutil"
 
- 	"github.com/syncthing/syncthing/lib/osutil"
 
- 	"github.com/syncthing/syncthing/lib/protocol"
 
- 	"github.com/syncthing/syncthing/lib/sliceutil"
 
- 	"google.golang.org/protobuf/proto"
 
- )
 
- const (
 
- 	// Arbitrarily chosen values for checkpoint frequency....
 
- 	updatePointsPerFile   = 100
 
- 	updatePointsPerBlock  = 1
 
- 	updatePointsThreshold = 250_000
 
- )
 
- func (s *folderDB) Update(device protocol.DeviceID, fs []protocol.FileInfo) error {
 
- 	s.updateLock.Lock()
 
- 	defer s.updateLock.Unlock()
 
- 	deviceIdx, err := s.deviceIdxLocked(device)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	tx, err := s.sql.BeginTxx(context.Background(), nil)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	defer tx.Rollback() //nolint:errcheck
 
- 	txp := &txPreparedStmts{Tx: tx}
 
- 	//nolint:sqlclosecheck
 
- 	insertNameStmt, err := txp.Preparex(`
 
- 		INSERT INTO file_names(name)
 
- 		VALUES (?)
 
- 		ON CONFLICT(name) DO UPDATE
 
- 			SET name = excluded.name
 
- 		RETURNING idx
 
- 	`)
 
- 	if err != nil {
 
- 		return wrap(err, "prepare insert name")
 
- 	}
 
- 	//nolint:sqlclosecheck
 
- 	insertVersionStmt, err := txp.Preparex(`
 
- 		INSERT INTO file_versions (version)
 
- 		VALUES (?)
 
- 		ON CONFLICT(version) DO UPDATE
 
- 			SET version = excluded.version
 
- 		RETURNING idx
 
- 	`)
 
- 	if err != nil {
 
- 		return wrap(err, "prepare insert version")
 
- 	}
 
- 	//nolint:sqlclosecheck
 
- 	insertFileStmt, err := txp.Preparex(`
 
- 		INSERT OR REPLACE INTO files (device_idx, remote_sequence, type, modified, size, deleted, local_flags, blocklist_hash, name_idx, version_idx)
 
- 		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 
- 		RETURNING sequence
 
- 	`)
 
- 	if err != nil {
 
- 		return wrap(err, "prepare insert file")
 
- 	}
 
- 	//nolint:sqlclosecheck
 
- 	insertFileInfoStmt, err := txp.Preparex(`
 
- 		INSERT INTO fileinfos (sequence, fiprotobuf)
 
- 		VALUES (?, ?)
 
- 	`)
 
- 	if err != nil {
 
- 		return wrap(err, "prepare insert fileinfo")
 
- 	}
 
- 	//nolint:sqlclosecheck
 
- 	insertBlockListStmt, err := txp.Preparex(`
 
- 		INSERT OR IGNORE INTO blocklists (blocklist_hash, blprotobuf)
 
- 		VALUES (?, ?)
 
- 	`)
 
- 	if err != nil {
 
- 		return wrap(err, "prepare insert blocklist")
 
- 	}
 
- 	var prevRemoteSeq int64
 
- 	for i, f := range fs {
 
- 		f.Name = osutil.NormalizedFilename(f.Name)
 
- 		var blockshash *[]byte
 
- 		if len(f.Blocks) > 0 {
 
- 			f.BlocksHash = protocol.BlocksHash(f.Blocks)
 
- 			blockshash = &f.BlocksHash
 
- 		} else {
 
- 			f.BlocksHash = nil
 
- 		}
 
- 		if f.Type == protocol.FileInfoTypeDirectory {
 
- 			f.Size = 128 // synthetic directory size
 
- 		}
 
- 		// Insert the file.
 
- 		//
 
- 		// If it is a remote file, set remote_sequence otherwise leave it at
 
- 		// null. Returns the new local sequence.
 
- 		var remoteSeq *int64
 
- 		if device != protocol.LocalDeviceID {
 
- 			if i > 0 && f.Sequence == prevRemoteSeq {
 
- 				return fmt.Errorf("duplicate remote sequence number %d", prevRemoteSeq)
 
- 			}
 
- 			prevRemoteSeq = f.Sequence
 
- 			remoteSeq = &f.Sequence
 
- 		}
 
- 		var nameIdx int64
 
- 		if err := insertNameStmt.Get(&nameIdx, f.Name); err != nil {
 
- 			return wrap(err, "insert name")
 
- 		}
 
- 		var versionIdx int64
 
- 		if err := insertVersionStmt.Get(&versionIdx, f.Version.String()); err != nil {
 
- 			return wrap(err, "insert version")
 
- 		}
 
- 		var localSeq int64
 
- 		if err := insertFileStmt.Get(&localSeq, deviceIdx, remoteSeq, f.Type, f.ModTime().UnixNano(), f.Size, f.IsDeleted(), f.LocalFlags, blockshash, nameIdx, versionIdx); err != nil {
 
- 			return wrap(err, "insert file")
 
- 		}
 
- 		if len(f.Blocks) > 0 {
 
- 			// Indirect the block list
 
- 			blocks := sliceutil.Map(f.Blocks, protocol.BlockInfo.ToWire)
 
- 			bs, err := proto.Marshal(&dbproto.BlockList{Blocks: blocks})
 
- 			if err != nil {
 
- 				return wrap(err, "marshal blocklist")
 
- 			}
 
- 			if _, err := insertBlockListStmt.Exec(f.BlocksHash, bs); err != nil {
 
- 				return wrap(err, "insert blocklist")
 
- 			} else if device == protocol.LocalDeviceID {
 
- 				// Insert all blocks
 
- 				if err := s.insertBlocksLocked(txp, f.BlocksHash, f.Blocks); err != nil {
 
- 					return wrap(err, "insert blocks")
 
- 				}
 
- 			}
 
- 			f.Blocks = nil
 
- 		}
 
- 		// Insert the fileinfo
 
- 		if device == protocol.LocalDeviceID {
 
- 			f.Sequence = localSeq
 
- 		}
 
- 		bs, err := proto.Marshal(f.ToWire(true))
 
- 		if err != nil {
 
- 			return wrap(err, "marshal fileinfo")
 
- 		}
 
- 		if _, err := insertFileInfoStmt.Exec(localSeq, bs); err != nil {
 
- 			return wrap(err, "insert fileinfo")
 
- 		}
 
- 		// Update global and need
 
- 		if err := s.recalcGlobalForFile(txp, f.Name); err != nil {
 
- 			return wrap(err)
 
- 		}
 
- 	}
 
- 	if err := tx.Commit(); err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	s.periodicCheckpointLocked(fs)
 
- 	return nil
 
- }
 
- func (s *folderDB) DropDevice(device protocol.DeviceID) error {
 
- 	if device == protocol.LocalDeviceID {
 
- 		panic("bug: cannot drop local device")
 
- 	}
 
- 	s.updateLock.Lock()
 
- 	defer s.updateLock.Unlock()
 
- 	tx, err := s.sql.BeginTxx(context.Background(), nil)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	defer tx.Rollback() //nolint:errcheck
 
- 	txp := &txPreparedStmts{Tx: tx}
 
- 	// Drop the device, which cascades to delete all files etc for it
 
- 	if _, err := tx.Exec(`DELETE FROM devices WHERE device_id = ?`, device.String()); err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	// Recalc the globals for all affected folders
 
- 	if err := s.recalcGlobalForFolder(txp); err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	return wrap(tx.Commit())
 
- }
 
- func (s *folderDB) DropAllFiles(device protocol.DeviceID) error {
 
- 	s.updateLock.Lock()
 
- 	defer s.updateLock.Unlock()
 
- 	// This is a two part operation, first dropping all the files and then
 
- 	// recalculating the global state for the entire folder.
 
- 	deviceIdx, err := s.deviceIdxLocked(device)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	tx, err := s.sql.BeginTxx(context.Background(), nil)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	defer tx.Rollback() //nolint:errcheck
 
- 	txp := &txPreparedStmts{Tx: tx}
 
- 	// Drop all the file entries
 
- 	result, err := tx.Exec(`
 
- 		DELETE FROM files
 
- 		WHERE device_idx = ?
 
- 	`, deviceIdx)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	if n, err := result.RowsAffected(); err == nil && n == 0 {
 
- 		// The delete affected no rows, so we don't need to redo the entire
 
- 		// global/need calculation.
 
- 		return wrap(tx.Commit())
 
- 	}
 
- 	// Recalc global for the entire folder
 
- 	if err := s.recalcGlobalForFolder(txp); err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	return wrap(tx.Commit())
 
- }
 
- func (s *folderDB) DropFilesNamed(device protocol.DeviceID, names []string) error {
 
- 	for i := range names {
 
- 		names[i] = osutil.NormalizedFilename(names[i])
 
- 	}
 
- 	s.updateLock.Lock()
 
- 	defer s.updateLock.Unlock()
 
- 	deviceIdx, err := s.deviceIdxLocked(device)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	tx, err := s.sql.BeginTxx(context.Background(), nil)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	defer tx.Rollback() //nolint:errcheck
 
- 	txp := &txPreparedStmts{Tx: tx}
 
- 	// Drop the named files
 
- 	query, args, err := sqlx.In(`
 
- 		DELETE FROM files
 
- 		WHERE device_idx = ? AND name_idx IN (
 
- 			SELECT idx FROM file_names WHERE name IN (?)
 
- 		)
 
- 	`, deviceIdx, names)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	if _, err := tx.Exec(query, args...); err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	// Recalc globals for the named files
 
- 	for _, name := range names {
 
- 		if err := s.recalcGlobalForFile(txp, name); err != nil {
 
- 			return wrap(err)
 
- 		}
 
- 	}
 
- 	return wrap(tx.Commit())
 
- }
 
- func (*folderDB) insertBlocksLocked(tx *txPreparedStmts, blocklistHash []byte, blocks []protocol.BlockInfo) error {
 
- 	if len(blocks) == 0 {
 
- 		return nil
 
- 	}
 
- 	bs := make([]map[string]any, len(blocks))
 
- 	for i, b := range blocks {
 
- 		bs[i] = map[string]any{
 
- 			"hash":           b.Hash,
 
- 			"blocklist_hash": blocklistHash,
 
- 			"idx":            i,
 
- 			"offset":         b.Offset,
 
- 			"size":           b.Size,
 
- 		}
 
- 	}
 
- 	// Very large block lists (>8000 blocks) result in "too many variables"
 
- 	// error. Chunk it to a reasonable size.
 
- 	for chunk := range slices.Chunk(bs, 1000) {
 
- 		if _, err := tx.NamedExec(`
 
- 			INSERT OR IGNORE INTO blocks (hash, blocklist_hash, idx, offset, size)
 
- 			VALUES (:hash, :blocklist_hash, :idx, :offset, :size)
 
- 		`, chunk); err != nil {
 
- 			return wrap(err)
 
- 		}
 
- 	}
 
- 	return nil
 
- }
 
- func (s *folderDB) recalcGlobalForFolder(txp *txPreparedStmts) error {
 
- 	// Select files where there is no global, those are the ones we need to
 
- 	// recalculate.
 
- 	//nolint:sqlclosecheck
 
- 	namesStmt, err := txp.Preparex(`
 
- 		SELECT n.name FROM files f
 
- 		INNER JOIN file_names n ON n.idx = f.name_idx
 
- 		WHERE NOT EXISTS (
 
- 			SELECT 1 FROM files g
 
- 			WHERE g.name_idx = f.name_idx AND g.local_flags & ? != 0
 
- 		)
 
- 		GROUP BY n.name
 
- 	`)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	rows, err := namesStmt.Queryx(protocol.FlagLocalGlobal)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	defer rows.Close()
 
- 	for rows.Next() {
 
- 		var name string
 
- 		if err := rows.Scan(&name); err != nil {
 
- 			return wrap(err)
 
- 		}
 
- 		if err := s.recalcGlobalForFile(txp, name); err != nil {
 
- 			return wrap(err)
 
- 		}
 
- 	}
 
- 	return wrap(rows.Err())
 
- }
 
- func (s *folderDB) recalcGlobalForFile(txp *txPreparedStmts, file string) error {
 
- 	//nolint:sqlclosecheck
 
- 	selStmt, err := txp.Preparex(`
 
- 		SELECT n.name, f.device_idx, f.sequence, f.modified, v.version, f.deleted, f.local_flags FROM files f
 
- 		INNER JOIN file_versions v ON v.idx = f.version_idx
 
- 		INNER JOIN file_names n ON n.idx = f.name_idx
 
- 		WHERE n.name = ?
 
- 	`)
 
- 	if err != nil {
 
- 		return wrap(err, "prepare select")
 
- 	}
 
- 	es, err := itererr.Collect(iterStructs[fileRow](selStmt.Queryx(file)))
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	if len(es) == 0 {
 
- 		// shouldn't happen
 
- 		return nil
 
- 	}
 
- 	// Sort the entries; the global entry is at the head of the list
 
- 	slices.SortFunc(es, fileRow.Compare)
 
- 	// The global version is the first one in the list that is not invalid,
 
- 	// or just the first one in the list if all are invalid.
 
- 	var global fileRow
 
- 	globIdx := slices.IndexFunc(es, func(e fileRow) bool { return !e.IsInvalid() })
 
- 	if globIdx < 0 {
 
- 		globIdx = 0
 
- 	}
 
- 	global = es[globIdx]
 
- 	// We "have" the file if the position in the list of versions is at the
 
- 	// global version or better, or if the version is the same as the global
 
- 	// file (we might be further down the list due to invalid flags), or if
 
- 	// the global is deleted and we don't have it at all...
 
- 	localIdx := slices.IndexFunc(es, func(e fileRow) bool { return e.DeviceIdx == s.localDeviceIdx })
 
- 	hasLocal := localIdx >= 0 && localIdx <= globIdx || // have a better or equal version
 
- 		localIdx >= 0 && es[localIdx].Version.Equal(global.Version.Vector) || // have an equal version but invalid/ignored
 
- 		localIdx < 0 && global.Deleted // missing it, but the global is also deleted
 
- 	// Set the global flag on the global entry. Set the need flag if the
 
- 	// local device needs this file, unless it's invalid.
 
- 	global.LocalFlags |= protocol.FlagLocalGlobal
 
- 	if hasLocal || global.IsInvalid() {
 
- 		global.LocalFlags &= ^protocol.FlagLocalNeeded
 
- 	} else {
 
- 		global.LocalFlags |= protocol.FlagLocalNeeded
 
- 	}
 
- 	//nolint:sqlclosecheck
 
- 	upStmt, err := txp.Preparex(`
 
- 		UPDATE files SET local_flags = ?
 
- 		WHERE device_idx = ? AND sequence = ?
 
- 	`)
 
- 	if err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	if _, err := upStmt.Exec(global.LocalFlags, global.DeviceIdx, global.Sequence); err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	// Clear the need and global flags on all other entries
 
- 	//nolint:sqlclosecheck
 
- 	upStmt, err = txp.Preparex(`
 
- 		UPDATE files SET local_flags = local_flags & ?
 
- 		WHERE name_idx = (SELECT idx FROM file_names WHERE name = ?) AND sequence != ? AND local_flags & ? != 0
 
- 	`)
 
- 	if err != nil {
 
- 		return wrap(err, "prepare update")
 
- 	}
 
- 	if _, err := upStmt.Exec(^(protocol.FlagLocalNeeded | protocol.FlagLocalGlobal), global.Name, global.Sequence, protocol.FlagLocalNeeded|protocol.FlagLocalGlobal); err != nil {
 
- 		return wrap(err)
 
- 	}
 
- 	return nil
 
- }
 
- func (s *DB) folderIdxLocked(folderID string) (int64, error) {
 
- 	if _, err := s.stmt(`
 
- 		INSERT OR IGNORE INTO folders(folder_id)
 
- 		VALUES (?)
 
- 	`).Exec(folderID); err != nil {
 
- 		return 0, wrap(err)
 
- 	}
 
- 	var idx int64
 
- 	if err := s.stmt(`
 
- 		SELECT idx FROM folders
 
- 		WHERE folder_id = ?
 
- 	`).Get(&idx, folderID); err != nil {
 
- 		return 0, wrap(err)
 
- 	}
 
- 	return idx, nil
 
- }
 
- type fileRow struct {
 
- 	Name       string
 
- 	Version    dbVector
 
- 	DeviceIdx  int64 `db:"device_idx"`
 
- 	Sequence   int64
 
- 	Modified   int64
 
- 	Size       int64
 
- 	LocalFlags protocol.FlagLocal `db:"local_flags"`
 
- 	Deleted    bool
 
- }
 
- func (e fileRow) Compare(other fileRow) int {
 
- 	// From FileInfo.WinsConflict
 
- 	vc := e.Version.Compare(other.Version.Vector)
 
- 	switch vc {
 
- 	case protocol.Equal:
 
- 		if e.IsInvalid() != other.IsInvalid() {
 
- 			if e.IsInvalid() {
 
- 				return 1
 
- 			}
 
- 			return -1
 
- 		}
 
- 		// Compare the device ID index, lower is better. This is only
 
- 		// deterministic to the extent that LocalDeviceID will always be the
 
- 		// lowest one, order between remote devices is random (and
 
- 		// irrelevant).
 
- 		return cmp.Compare(e.DeviceIdx, other.DeviceIdx)
 
- 	case protocol.Greater: // we are newer
 
- 		return -1
 
- 	case protocol.Lesser: // we are older
 
- 		return 1
 
- 	case protocol.ConcurrentGreater, protocol.ConcurrentLesser: // there is a conflict
 
- 		if e.IsInvalid() != other.IsInvalid() {
 
- 			if e.IsInvalid() { // we are invalid, we lose
 
- 				return 1
 
- 			}
 
- 			return -1 // they are invalid, we win
 
- 		}
 
- 		if d := cmp.Compare(e.Modified, other.Modified); d != 0 {
 
- 			return -d // positive d means we were newer, so we win (negative return)
 
- 		}
 
- 		if vc == protocol.ConcurrentGreater {
 
- 			return -1 // we have a better device ID, we win
 
- 		}
 
- 		return 1 // they win
 
- 	default:
 
- 		return 0
 
- 	}
 
- }
 
- func (e fileRow) IsInvalid() bool {
 
- 	return e.LocalFlags.IsInvalid()
 
- }
 
- func (s *folderDB) periodicCheckpointLocked(fs []protocol.FileInfo) {
 
- 	// Induce periodic checkpoints. We add points for each file and block,
 
- 	// and checkpoint when we've written more than a threshold of points.
 
- 	// This ensures we do not go too long without a checkpoint, while also
 
- 	// not doing it incessantly for every update.
 
- 	s.updatePoints += updatePointsPerFile * len(fs)
 
- 	for _, f := range fs {
 
- 		s.updatePoints += len(f.Blocks) * updatePointsPerBlock
 
- 	}
 
- 	if s.updatePoints > updatePointsThreshold {
 
- 		conn, err := s.sql.Conn(context.Background())
 
- 		if err != nil {
 
- 			slog.Debug("Connection error", slog.String("db", s.baseName), slogutil.Error(err))
 
- 			return
 
- 		}
 
- 		defer conn.Close()
 
- 		if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 8388608`); err != nil {
 
- 			slog.Debug("PRAGMA journal_size_limit error", slog.String("db", s.baseName), slogutil.Error(err))
 
- 		}
 
- 		// Every 50th checkpoint becomes a truncate, in an effort to bring
 
- 		// down the size now and then.
 
- 		checkpointType := "RESTART"
 
- 		if s.checkpointsCount > 50 {
 
- 			checkpointType = "TRUNCATE"
 
- 		}
 
- 		cmd := fmt.Sprintf(`PRAGMA wal_checkpoint(%s)`, checkpointType)
 
- 		row := conn.QueryRowContext(context.Background(), cmd)
 
- 		var res, modified, moved int
 
- 		if row.Err() != nil {
 
- 			slog.Debug("Command error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
 
- 		} else if err := row.Scan(&res, &modified, &moved); err != nil {
 
- 			slog.Debug("Command scan error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
 
- 		} else {
 
- 			slog.Debug("Checkpoint result", "db", s.baseName, "checkpointscount", s.checkpointsCount, "updatepoints", s.updatePoints, "res", res, "modified", modified, "moved", moved)
 
- 		}
 
- 		// Reset the truncate counter when a truncate succeeded. If it
 
- 		// failed, we'll keep trying it until we succeed. Increase it faster
 
- 		// when we fail to checkpoint, as it's more likely the WAL is
 
- 		// growing and will need truncation when we get out of this state.
 
- 		switch {
 
- 		case res == 1:
 
- 			s.checkpointsCount += 10
 
- 		case res == 0 && checkpointType == "TRUNCATE":
 
- 			s.checkpointsCount = 0
 
- 		default:
 
- 			s.checkpointsCount++
 
- 		}
 
- 		s.updatePoints = 0
 
- 	}
 
- }
 
 
  |