|
@@ -8,8 +8,11 @@ package sqlite
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
+ "encoding/binary"
|
|
|
"fmt"
|
|
|
"log/slog"
|
|
|
+ "math/rand"
|
|
|
+ "strings"
|
|
|
"time"
|
|
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
@@ -21,6 +24,10 @@ import (
|
|
|
const (
|
|
|
internalMetaPrefix = "dbsvc"
|
|
|
lastMaintKey = "lastMaint"
|
|
|
+
|
|
|
+ gcMinChunks = 5
|
|
|
+ gcChunkSize = 100_000 // approximate number of rows to process in a single gc query
|
|
|
+ gcMaxRuntime = 5 * time.Minute // max time to spend on gc, per table, per run
|
|
|
)
|
|
|
|
|
|
func (s *DB) Service(maintenanceInterval time.Duration) suture.Service {
|
|
@@ -119,7 +126,7 @@ func tidy(ctx context.Context, db *sqlx.DB) error {
|
|
|
}
|
|
|
|
|
|
func garbageCollectOldDeletedLocked(ctx context.Context, fdb *folderDB) error {
|
|
|
- l := slog.With("fdb", fdb.baseDB)
|
|
|
+ l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName)
|
|
|
if fdb.deleteRetention <= 0 {
|
|
|
slog.DebugContext(ctx, "Delete retention is infinite, skipping cleanup")
|
|
|
return nil
|
|
@@ -171,37 +178,108 @@ func garbageCollectBlocklistsAndBlocksLocked(ctx context.Context, fdb *folderDB)
|
|
|
}
|
|
|
defer tx.Rollback() //nolint:errcheck
|
|
|
|
|
|
- if res, err := tx.ExecContext(ctx, `
|
|
|
- DELETE FROM blocklists
|
|
|
- WHERE NOT EXISTS (
|
|
|
- SELECT 1 FROM files WHERE files.blocklist_hash = blocklists.blocklist_hash
|
|
|
- )`); err != nil {
|
|
|
- return wrap(err, "delete blocklists")
|
|
|
- } else {
|
|
|
- slog.DebugContext(ctx, "Blocklist GC", "fdb", fdb.baseName, "result", slogutil.Expensive(func() any {
|
|
|
- rows, err := res.RowsAffected()
|
|
|
- if err != nil {
|
|
|
- return slogutil.Error(err)
|
|
|
+ // Both blocklists and blocks refer to blocklists_hash from the files table.
|
|
|
+ for _, table := range []string{"blocklists", "blocks"} {
|
|
|
+ // Count the number of rows
|
|
|
+ var rows int64
|
|
|
+ if err := tx.GetContext(ctx, &rows, `SELECT count(*) FROM `+table); err != nil {
|
|
|
+ return wrap(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ chunks := max(gcMinChunks, rows/gcChunkSize)
|
|
|
+ l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName, "table", table, "rows", rows, "chunks", chunks)
|
|
|
+
|
|
|
+ // Process rows in chunks up to a given time limit. We always use at
|
|
|
+ // least gcMinChunks chunks, then increase the number as the number of rows
|
|
|
+ // exceeds gcMinChunks*gcChunkSize.
|
|
|
+ t0 := time.Now()
|
|
|
+ for i, br := range randomBlobRanges(int(chunks)) {
|
|
|
+ if d := time.Since(t0); d > gcMaxRuntime {
|
|
|
+ l.InfoContext(ctx, "GC was interrupted due to exceeding time limit", "processed", i, "runtime", time.Since(t0))
|
|
|
+ break
|
|
|
}
|
|
|
- return slog.Int64("rows", rows)
|
|
|
- }))
|
|
|
- }
|
|
|
-
|
|
|
- if res, err := tx.ExecContext(ctx, `
|
|
|
- DELETE FROM blocks
|
|
|
- WHERE NOT EXISTS (
|
|
|
- SELECT 1 FROM blocklists WHERE blocklists.blocklist_hash = blocks.blocklist_hash
|
|
|
- )`); err != nil {
|
|
|
- return wrap(err, "delete blocks")
|
|
|
- } else {
|
|
|
- slog.DebugContext(ctx, "Blocks GC", "fdb", fdb.baseName, "result", slogutil.Expensive(func() any {
|
|
|
- rows, err := res.RowsAffected()
|
|
|
- if err != nil {
|
|
|
- return slogutil.Error(err)
|
|
|
+
|
|
|
+ // The limit column must be an indexed column with a mostly random distribution of blobs.
|
|
|
+ // That's the blocklist_hash column for blocklists, and the hash column for blocks.
|
|
|
+ limitColumn := table + ".blocklist_hash"
|
|
|
+ if table == "blocks" {
|
|
|
+ limitColumn = "blocks.hash"
|
|
|
}
|
|
|
- return slog.Int64("rows", rows)
|
|
|
- }))
|
|
|
+
|
|
|
+ q := fmt.Sprintf(`
|
|
|
+ DELETE FROM %s
|
|
|
+ WHERE %s AND NOT EXISTS (
|
|
|
+ SELECT 1 FROM files WHERE files.blocklist_hash = %s.blocklist_hash
|
|
|
+ )`, table, br.SQL(limitColumn), table)
|
|
|
+
|
|
|
+ if res, err := tx.ExecContext(ctx, q); err != nil {
|
|
|
+ return wrap(err, "delete from "+table)
|
|
|
+ } else {
|
|
|
+ l.DebugContext(ctx, "GC query result", "processed", i, "runtime", time.Since(t0), "result", slogutil.Expensive(func() any {
|
|
|
+ rows, err := res.RowsAffected()
|
|
|
+ if err != nil {
|
|
|
+ return slogutil.Error(err)
|
|
|
+ }
|
|
|
+ return slog.Int64("rows", rows)
|
|
|
+ }))
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
return wrap(tx.Commit())
|
|
|
}
|
|
|
+
|
|
|
+// blobRange defines a range for blob searching. A range is open ended if
|
|
|
+// start or end is nil.
|
|
|
+type blobRange struct {
|
|
|
+ start, end []byte
|
|
|
+}
|
|
|
+
|
|
|
+// SQL returns the SQL where clause for the given range, e.g.
|
|
|
+// `column >= x'49249248' AND column < x'6db6db6c'`
|
|
|
+func (r blobRange) SQL(name string) string {
|
|
|
+ var sb strings.Builder
|
|
|
+ if r.start != nil {
|
|
|
+ fmt.Fprintf(&sb, "%s >= x'%x'", name, r.start)
|
|
|
+ }
|
|
|
+ if r.start != nil && r.end != nil {
|
|
|
+ sb.WriteString(" AND ")
|
|
|
+ }
|
|
|
+ if r.end != nil {
|
|
|
+ fmt.Fprintf(&sb, "%s < x'%x'", name, r.end)
|
|
|
+ }
|
|
|
+ return sb.String()
|
|
|
+}
|
|
|
+
|
|
|
+// randomBlobRanges returns n blobRanges in random order
|
|
|
+func randomBlobRanges(n int) []blobRange {
|
|
|
+ ranges := blobRanges(n)
|
|
|
+ rand.Shuffle(len(ranges), func(i, j int) { ranges[i], ranges[j] = ranges[j], ranges[i] })
|
|
|
+ return ranges
|
|
|
+}
|
|
|
+
|
|
|
+// blobRanges returns n blobRanges
|
|
|
+func blobRanges(n int) []blobRange {
|
|
|
+ // We use three byte (24 bit) prefixes to get fairly granular ranges and easy bit
|
|
|
+ // conversions.
|
|
|
+ rangeSize := (1 << 24) / n
|
|
|
+ ranges := make([]blobRange, 0, n)
|
|
|
+ var prev []byte
|
|
|
+ for i := range n {
|
|
|
+ var pref []byte
|
|
|
+ if i < n-1 {
|
|
|
+ end := (i + 1) * rangeSize
|
|
|
+ pref = intToBlob(end)
|
|
|
+ }
|
|
|
+ ranges = append(ranges, blobRange{prev, pref})
|
|
|
+ prev = pref
|
|
|
+ }
|
|
|
+ return ranges
|
|
|
+}
|
|
|
+
|
|
|
+func intToBlob(n int) []byte {
|
|
|
+ var pref [4]byte
|
|
|
+ binary.BigEndian.PutUint32(pref[:], uint32(n)) //nolint:gosec
|
|
|
+ // first byte is always zero and not part of the range
|
|
|
+ return pref[1:]
|
|
|
+}
|