123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- // Copyright (C) 2025 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- package sqlite
- import (
- "context"
- "encoding/binary"
- "fmt"
- "log/slog"
- "math/rand"
- "strings"
- "time"
- "github.com/jmoiron/sqlx"
- "github.com/syncthing/syncthing/internal/db"
- "github.com/syncthing/syncthing/internal/slogutil"
- "github.com/syncthing/syncthing/lib/protocol"
- "github.com/thejerf/suture/v4"
- )
- const (
- internalMetaPrefix = "dbsvc"
- lastMaintKey = "lastMaint"
- lastSuccessfulGCSeqKey = "lastSuccessfulGCSeq"
- gcMinChunks = 5
- gcChunkSize = 100_000 // approximate number of rows to process in a single gc query
- gcMaxRuntime = 5 * time.Minute // max time to spend on gc, per table, per run
- )
- func (s *DB) Service(maintenanceInterval time.Duration) suture.Service {
- return newService(s, maintenanceInterval)
- }
- type Service struct {
- sdb *DB
- maintenanceInterval time.Duration
- internalMeta *db.Typed
- }
- func (s *Service) String() string {
- return fmt.Sprintf("sqlite.service@%p", s)
- }
- func newService(sdb *DB, maintenanceInterval time.Duration) *Service {
- return &Service{
- sdb: sdb,
- maintenanceInterval: maintenanceInterval,
- internalMeta: db.NewTyped(sdb, internalMetaPrefix),
- }
- }
- func (s *Service) Serve(ctx context.Context) error {
- // Run periodic maintenance
- // Figure out when we last ran maintenance and schedule accordingly. If
- // it was never, do it now.
- lastMaint, _, _ := s.internalMeta.Time(lastMaintKey)
- nextMaint := lastMaint.Add(s.maintenanceInterval)
- wait := time.Until(nextMaint)
- if wait < 0 {
- wait = time.Minute
- }
- slog.DebugContext(ctx, "Next periodic run due", "after", wait)
- timer := time.NewTimer(wait)
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-timer.C:
- }
- if err := s.periodic(ctx); err != nil {
- return wrap(err)
- }
- timer.Reset(s.maintenanceInterval)
- slog.DebugContext(ctx, "Next periodic run due", "after", s.maintenanceInterval)
- _ = s.internalMeta.PutTime(lastMaintKey, time.Now())
- }
- }
- func (s *Service) periodic(ctx context.Context) error {
- t0 := time.Now()
- slog.DebugContext(ctx, "Periodic start")
- t1 := time.Now()
- defer func() { slog.DebugContext(ctx, "Periodic done in", "t1", time.Since(t1), "t0t1", t1.Sub(t0)) }()
- s.sdb.updateLock.Lock()
- err := tidy(ctx, s.sdb.sql)
- s.sdb.updateLock.Unlock()
- if err != nil {
- return err
- }
- return wrap(s.sdb.forEachFolder(func(fdb *folderDB) error {
- // Get the current device sequence, for comparison in the next step.
- seq, err := fdb.GetDeviceSequence(protocol.LocalDeviceID)
- if err != nil {
- return wrap(err)
- }
- // Get the last successful GC sequence. If it's the same as the
- // current sequence, nothing has changed and we can skip the GC
- // entirely.
- meta := db.NewTyped(fdb, internalMetaPrefix)
- if prev, _, err := meta.Int64(lastSuccessfulGCSeqKey); err != nil {
- return wrap(err)
- } else if seq == prev {
- slog.DebugContext(ctx, "Skipping unnecessary GC", "folder", fdb.folderID, "fdb", fdb.baseName)
- return nil
- }
- // Run the GC steps, in a function to be able to use a deferred
- // unlock.
- if err := func() error {
- fdb.updateLock.Lock()
- defer fdb.updateLock.Unlock()
- if err := garbageCollectOldDeletedLocked(ctx, fdb); err != nil {
- return wrap(err)
- }
- if err := garbageCollectNamesAndVersions(ctx, fdb); err != nil {
- return wrap(err)
- }
- if err := garbageCollectBlocklistsAndBlocksLocked(ctx, fdb); err != nil {
- return wrap(err)
- }
- return tidy(ctx, fdb.sql)
- }(); err != nil {
- return wrap(err)
- }
- // Update the successful GC sequence.
- return wrap(meta.PutInt64(lastSuccessfulGCSeqKey, seq))
- }))
- }
- func tidy(ctx context.Context, db *sqlx.DB) error {
- conn, err := db.Conn(ctx)
- if err != nil {
- return wrap(err)
- }
- defer conn.Close()
- _, _ = conn.ExecContext(ctx, `ANALYZE`)
- _, _ = conn.ExecContext(ctx, `PRAGMA optimize`)
- _, _ = conn.ExecContext(ctx, `PRAGMA incremental_vacuum`)
- _, _ = conn.ExecContext(ctx, `PRAGMA journal_size_limit = 8388608`)
- _, _ = conn.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`)
- return nil
- }
- func garbageCollectNamesAndVersions(ctx context.Context, fdb *folderDB) error {
- l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName)
- res, err := fdb.stmt(`
- DELETE FROM file_names
- WHERE NOT EXISTS (SELECT 1 FROM files f WHERE f.name_idx = idx)
- `).Exec()
- if err != nil {
- return wrap(err, "delete names")
- }
- if aff, err := res.RowsAffected(); err == nil {
- l.DebugContext(ctx, "Removed old file names", "affected", aff)
- }
- res, err = fdb.stmt(`
- DELETE FROM file_versions
- WHERE NOT EXISTS (SELECT 1 FROM files f WHERE f.version_idx = idx)
- `).Exec()
- if err != nil {
- return wrap(err, "delete versions")
- }
- if aff, err := res.RowsAffected(); err == nil {
- l.DebugContext(ctx, "Removed old file versions", "affected", aff)
- }
- return nil
- }
- func garbageCollectOldDeletedLocked(ctx context.Context, fdb *folderDB) error {
- l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName)
- if fdb.deleteRetention <= 0 {
- slog.DebugContext(ctx, "Delete retention is infinite, skipping cleanup")
- return nil
- }
- // Remove deleted files that are marked as not needed (we have processed
- // them) and they were deleted more than MaxDeletedFileAge ago.
- l.DebugContext(ctx, "Forgetting deleted files", "retention", fdb.deleteRetention)
- res, err := fdb.stmt(`
- DELETE FROM files
- WHERE deleted AND modified < ? AND local_flags & {{.FlagLocalNeeded}} == 0
- `).Exec(time.Now().Add(-fdb.deleteRetention).UnixNano())
- if err != nil {
- return wrap(err)
- }
- if aff, err := res.RowsAffected(); err == nil {
- l.DebugContext(ctx, "Removed old deleted file records", "affected", aff)
- }
- return nil
- }
- func garbageCollectBlocklistsAndBlocksLocked(ctx context.Context, fdb *folderDB) error {
- // Remove all blocklists not referred to by any files and, by extension,
- // any blocks not referred to by a blocklist. This is an expensive
- // operation when run normally, especially if there are a lot of blocks
- // to collect.
- //
- // We make this orders of magnitude faster by disabling foreign keys for
- // the transaction and doing the cleanup manually. This requires using
- // an explicit connection and disabling foreign keys before starting the
- // transaction. We make sure to clean up on the way out.
- conn, err := fdb.sql.Connx(ctx)
- if err != nil {
- return wrap(err)
- }
- defer conn.Close()
- if _, err := conn.ExecContext(ctx, `PRAGMA foreign_keys = 0`); err != nil {
- return wrap(err)
- }
- defer func() { //nolint:contextcheck
- _, _ = conn.ExecContext(context.Background(), `PRAGMA foreign_keys = 1`)
- }()
- tx, err := conn.BeginTxx(ctx, nil)
- if err != nil {
- return wrap(err)
- }
- defer tx.Rollback() //nolint:errcheck
- // Both blocklists and blocks refer to blocklists_hash from the files table.
- for _, table := range []string{"blocklists", "blocks"} {
- // Count the number of rows
- var rows int64
- if err := tx.GetContext(ctx, &rows, `SELECT count(*) FROM `+table); err != nil {
- return wrap(err)
- }
- chunks := max(gcMinChunks, rows/gcChunkSize)
- l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName, "table", table, "rows", rows, "chunks", chunks)
- // Process rows in chunks up to a given time limit. We always use at
- // least gcMinChunks chunks, then increase the number as the number of rows
- // exceeds gcMinChunks*gcChunkSize.
- t0 := time.Now()
- for i, br := range randomBlobRanges(int(chunks)) {
- if d := time.Since(t0); d > gcMaxRuntime {
- l.InfoContext(ctx, "GC was interrupted due to exceeding time limit", "processed", i, "runtime", time.Since(t0))
- break
- }
- // The limit column must be an indexed column with a mostly random distribution of blobs.
- // That's the blocklist_hash column for blocklists, and the hash column for blocks.
- limitColumn := table + ".blocklist_hash"
- if table == "blocks" {
- limitColumn = "blocks.hash"
- }
- q := fmt.Sprintf(`
- DELETE FROM %s
- WHERE %s AND NOT EXISTS (
- SELECT 1 FROM files WHERE files.blocklist_hash = %s.blocklist_hash
- )`, table, br.SQL(limitColumn), table)
- if res, err := tx.ExecContext(ctx, q); err != nil {
- return wrap(err, "delete from "+table)
- } else {
- l.DebugContext(ctx, "GC query result", "processed", i, "runtime", time.Since(t0), "result", slogutil.Expensive(func() any {
- rows, err := res.RowsAffected()
- if err != nil {
- return slogutil.Error(err)
- }
- return slog.Int64("rows", rows)
- }))
- }
- }
- }
- return wrap(tx.Commit())
- }
- // blobRange defines a range for blob searching. A range is open ended if
- // start or end is nil.
- type blobRange struct {
- start, end []byte
- }
- // SQL returns the SQL where clause for the given range, e.g.
- // `column >= x'49249248' AND column < x'6db6db6c'`
- func (r blobRange) SQL(name string) string {
- var sb strings.Builder
- if r.start != nil {
- fmt.Fprintf(&sb, "%s >= x'%x'", name, r.start)
- }
- if r.start != nil && r.end != nil {
- sb.WriteString(" AND ")
- }
- if r.end != nil {
- fmt.Fprintf(&sb, "%s < x'%x'", name, r.end)
- }
- return sb.String()
- }
- // randomBlobRanges returns n blobRanges in random order
- func randomBlobRanges(n int) []blobRange {
- ranges := blobRanges(n)
- rand.Shuffle(len(ranges), func(i, j int) { ranges[i], ranges[j] = ranges[j], ranges[i] })
- return ranges
- }
- // blobRanges returns n blobRanges
- func blobRanges(n int) []blobRange {
- // We use three byte (24 bit) prefixes to get fairly granular ranges and easy bit
- // conversions.
- rangeSize := (1 << 24) / n
- ranges := make([]blobRange, 0, n)
- var prev []byte
- for i := range n {
- var pref []byte
- if i < n-1 {
- end := (i + 1) * rangeSize
- pref = intToBlob(end)
- }
- ranges = append(ranges, blobRange{prev, pref})
- prev = pref
- }
- return ranges
- }
- func intToBlob(n int) []byte {
- var pref [4]byte
- binary.BigEndian.PutUint32(pref[:], uint32(n)) //nolint:gosec
- // first byte is always zero and not part of the range
- return pref[1:]
- }
|