db_service.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. // Copyright (C) 2025 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package sqlite
  7. import (
  8. "context"
  9. "encoding/binary"
  10. "fmt"
  11. "log/slog"
  12. "math/rand"
  13. "strings"
  14. "time"
  15. "github.com/jmoiron/sqlx"
  16. "github.com/syncthing/syncthing/internal/db"
  17. "github.com/syncthing/syncthing/internal/slogutil"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. )
  20. const (
  21. internalMetaPrefix = "dbsvc"
  22. lastMaintKey = "lastMaint"
  23. lastSuccessfulGCSeqKey = "lastSuccessfulGCSeq"
  24. gcMinChunks = 5
  25. gcChunkSize = 100_000 // approximate number of rows to process in a single gc query
  26. gcMaxRuntime = 5 * time.Minute // max time to spend on gc, per table, per run
  27. )
  28. func (s *DB) Service(maintenanceInterval time.Duration) db.DBService {
  29. return newService(s, maintenanceInterval)
  30. }
  31. type Service struct {
  32. sdb *DB
  33. maintenanceInterval time.Duration
  34. internalMeta *db.Typed
  35. start chan chan error
  36. }
  37. func (s *Service) String() string {
  38. return fmt.Sprintf("sqlite.service@%p", s)
  39. }
  40. func newService(sdb *DB, maintenanceInterval time.Duration) *Service {
  41. return &Service{
  42. sdb: sdb,
  43. maintenanceInterval: maintenanceInterval,
  44. internalMeta: db.NewTyped(sdb, internalMetaPrefix),
  45. start: make(chan chan error),
  46. }
  47. }
  48. func (s *Service) StartMaintenance() <-chan error {
  49. finishChan := make(chan error, 1)
  50. select {
  51. case s.start <- finishChan:
  52. default:
  53. }
  54. return finishChan
  55. }
  56. func (s *Service) Serve(ctx context.Context) error {
  57. // Run periodic maintenance
  58. // Figure out when we last ran maintenance and schedule accordingly. If
  59. // it was never, do it now.
  60. lastMaint, _, _ := s.internalMeta.Time(lastMaintKey)
  61. nextMaint := lastMaint.Add(s.maintenanceInterval)
  62. wait := time.Until(nextMaint)
  63. if wait < 0 {
  64. wait = time.Minute
  65. }
  66. slog.DebugContext(ctx, "Next periodic run due", "after", wait)
  67. timer := time.NewTimer(wait)
  68. if s.maintenanceInterval == 0 {
  69. timer.Stop()
  70. }
  71. for {
  72. var finishChan chan error
  73. select {
  74. case <-ctx.Done():
  75. return ctx.Err()
  76. case <-timer.C:
  77. case finishChan = <-s.start:
  78. }
  79. err := s.periodic(ctx)
  80. if finishChan != nil {
  81. finishChan <- err
  82. }
  83. if err != nil {
  84. return wrap(err)
  85. }
  86. if s.maintenanceInterval != 0 {
  87. timer.Reset(s.maintenanceInterval)
  88. slog.DebugContext(ctx, "Next periodic run due", "after", s.maintenanceInterval)
  89. }
  90. _ = s.internalMeta.PutTime(lastMaintKey, time.Now())
  91. }
  92. }
  93. func (s *Service) LastMaintenanceTime() time.Time {
  94. lastMaint, _, _ := s.internalMeta.Time(lastMaintKey)
  95. return lastMaint
  96. }
  97. func (s *Service) periodic(ctx context.Context) error {
  98. t0 := time.Now()
  99. slog.DebugContext(ctx, "Periodic start")
  100. t1 := time.Now()
  101. defer func() { slog.DebugContext(ctx, "Periodic done in", "t1", time.Since(t1), "t0t1", t1.Sub(t0)) }()
  102. s.sdb.updateLock.Lock()
  103. err := tidy(ctx, s.sdb.sql)
  104. s.sdb.updateLock.Unlock()
  105. if err != nil {
  106. return err
  107. }
  108. return wrap(s.sdb.forEachFolder(func(fdb *folderDB) error {
  109. // Get the current device sequence, for comparison in the next step.
  110. seq, err := fdb.GetDeviceSequence(protocol.LocalDeviceID)
  111. if err != nil {
  112. return wrap(err)
  113. }
  114. // Get the last successful GC sequence. If it's the same as the
  115. // current sequence, nothing has changed and we can skip the GC
  116. // entirely.
  117. meta := db.NewTyped(fdb, internalMetaPrefix)
  118. if prev, _, err := meta.Int64(lastSuccessfulGCSeqKey); err != nil {
  119. return wrap(err)
  120. } else if seq == prev {
  121. slog.DebugContext(ctx, "Skipping unnecessary GC", "folder", fdb.folderID, "fdb", fdb.baseName)
  122. return nil
  123. }
  124. // Run the GC steps, in a function to be able to use a deferred
  125. // unlock.
  126. if err := func() error {
  127. fdb.updateLock.Lock()
  128. defer fdb.updateLock.Unlock()
  129. if err := garbageCollectOldDeletedLocked(ctx, fdb); err != nil {
  130. return wrap(err)
  131. }
  132. if err := garbageCollectNamesAndVersions(ctx, fdb); err != nil {
  133. return wrap(err)
  134. }
  135. if err := garbageCollectBlocklistsAndBlocksLocked(ctx, fdb); err != nil {
  136. return wrap(err)
  137. }
  138. return tidy(ctx, fdb.sql)
  139. }(); err != nil {
  140. return wrap(err)
  141. }
  142. // Update the successful GC sequence.
  143. return wrap(meta.PutInt64(lastSuccessfulGCSeqKey, seq))
  144. }))
  145. }
  146. func tidy(ctx context.Context, db *sqlx.DB) error {
  147. conn, err := db.Conn(ctx)
  148. if err != nil {
  149. return wrap(err)
  150. }
  151. defer conn.Close()
  152. _, _ = conn.ExecContext(ctx, `ANALYZE`)
  153. _, _ = conn.ExecContext(ctx, `PRAGMA optimize`)
  154. _, _ = conn.ExecContext(ctx, `PRAGMA incremental_vacuum`)
  155. _, _ = conn.ExecContext(ctx, `PRAGMA journal_size_limit = 8388608`)
  156. _, _ = conn.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`)
  157. return nil
  158. }
  159. func garbageCollectNamesAndVersions(ctx context.Context, fdb *folderDB) error {
  160. l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName)
  161. res, err := fdb.stmt(`
  162. DELETE FROM file_names
  163. WHERE NOT EXISTS (SELECT 1 FROM files f WHERE f.name_idx = idx)
  164. `).Exec()
  165. if err != nil {
  166. return wrap(err, "delete names")
  167. }
  168. if aff, err := res.RowsAffected(); err == nil {
  169. l.DebugContext(ctx, "Removed old file names", "affected", aff)
  170. }
  171. res, err = fdb.stmt(`
  172. DELETE FROM file_versions
  173. WHERE NOT EXISTS (SELECT 1 FROM files f WHERE f.version_idx = idx)
  174. `).Exec()
  175. if err != nil {
  176. return wrap(err, "delete versions")
  177. }
  178. if aff, err := res.RowsAffected(); err == nil {
  179. l.DebugContext(ctx, "Removed old file versions", "affected", aff)
  180. }
  181. return nil
  182. }
  183. func garbageCollectOldDeletedLocked(ctx context.Context, fdb *folderDB) error {
  184. l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName)
  185. if fdb.deleteRetention <= 0 {
  186. slog.DebugContext(ctx, "Delete retention is infinite, skipping cleanup")
  187. return nil
  188. }
  189. // Remove deleted files that are marked as not needed (we have processed
  190. // them) and they were deleted more than MaxDeletedFileAge ago.
  191. l.DebugContext(ctx, "Forgetting deleted files", "retention", fdb.deleteRetention)
  192. res, err := fdb.stmt(`
  193. DELETE FROM files
  194. WHERE deleted AND modified < ? AND local_flags & {{.FlagLocalNeeded}} == 0
  195. `).Exec(time.Now().Add(-fdb.deleteRetention).UnixNano())
  196. if err != nil {
  197. return wrap(err)
  198. }
  199. if aff, err := res.RowsAffected(); err == nil {
  200. l.DebugContext(ctx, "Removed old deleted file records", "affected", aff)
  201. }
  202. return nil
  203. }
  204. func garbageCollectBlocklistsAndBlocksLocked(ctx context.Context, fdb *folderDB) error {
  205. // Remove all blocklists not referred to by any files and, by extension,
  206. // any blocks not referred to by a blocklist. This is an expensive
  207. // operation when run normally, especially if there are a lot of blocks
  208. // to collect.
  209. //
  210. // We make this orders of magnitude faster by disabling foreign keys for
  211. // the transaction and doing the cleanup manually. This requires using
  212. // an explicit connection and disabling foreign keys before starting the
  213. // transaction. We make sure to clean up on the way out.
  214. conn, err := fdb.sql.Connx(ctx)
  215. if err != nil {
  216. return wrap(err)
  217. }
  218. defer conn.Close()
  219. if _, err := conn.ExecContext(ctx, `PRAGMA foreign_keys = 0`); err != nil {
  220. return wrap(err)
  221. }
  222. defer func() { //nolint:contextcheck
  223. _, _ = conn.ExecContext(context.Background(), `PRAGMA foreign_keys = 1`)
  224. }()
  225. tx, err := conn.BeginTxx(ctx, nil)
  226. if err != nil {
  227. return wrap(err)
  228. }
  229. defer tx.Rollback() //nolint:errcheck
  230. // Both blocklists and blocks refer to blocklists_hash from the files table.
  231. for _, table := range []string{"blocklists", "blocks"} {
  232. // Count the number of rows
  233. var rows int64
  234. if err := tx.GetContext(ctx, &rows, `SELECT count(*) FROM `+table); err != nil {
  235. return wrap(err)
  236. }
  237. chunks := max(gcMinChunks, rows/gcChunkSize)
  238. l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName, "table", table, "rows", rows, "chunks", chunks)
  239. // Process rows in chunks up to a given time limit. We always use at
  240. // least gcMinChunks chunks, then increase the number as the number of rows
  241. // exceeds gcMinChunks*gcChunkSize.
  242. t0 := time.Now()
  243. for i, br := range randomBlobRanges(int(chunks)) {
  244. if d := time.Since(t0); d > gcMaxRuntime {
  245. l.InfoContext(ctx, "GC was interrupted due to exceeding time limit", "processed", i, "runtime", time.Since(t0))
  246. break
  247. }
  248. // The limit column must be an indexed column with a mostly random distribution of blobs.
  249. // That's the blocklist_hash column for blocklists, and the hash column for blocks.
  250. limitColumn := table + ".blocklist_hash"
  251. if table == "blocks" {
  252. limitColumn = "blocks.hash"
  253. }
  254. q := fmt.Sprintf(`
  255. DELETE FROM %s
  256. WHERE %s AND NOT EXISTS (
  257. SELECT 1 FROM files WHERE files.blocklist_hash = %s.blocklist_hash
  258. )`, table, br.SQL(limitColumn), table)
  259. if res, err := tx.ExecContext(ctx, q); err != nil {
  260. return wrap(err, "delete from "+table)
  261. } else {
  262. l.DebugContext(ctx, "GC query result", "processed", i, "runtime", time.Since(t0), "result", slogutil.Expensive(func() any {
  263. rows, err := res.RowsAffected()
  264. if err != nil {
  265. return slogutil.Error(err)
  266. }
  267. return slog.Int64("rows", rows)
  268. }))
  269. }
  270. }
  271. }
  272. return wrap(tx.Commit())
  273. }
  274. // blobRange defines a range for blob searching. A range is open ended if
  275. // start or end is nil.
  276. type blobRange struct {
  277. start, end []byte
  278. }
  279. // SQL returns the SQL where clause for the given range, e.g.
  280. // `column >= x'49249248' AND column < x'6db6db6c'`
  281. func (r blobRange) SQL(name string) string {
  282. var sb strings.Builder
  283. if r.start != nil {
  284. fmt.Fprintf(&sb, "%s >= x'%x'", name, r.start)
  285. }
  286. if r.start != nil && r.end != nil {
  287. sb.WriteString(" AND ")
  288. }
  289. if r.end != nil {
  290. fmt.Fprintf(&sb, "%s < x'%x'", name, r.end)
  291. }
  292. return sb.String()
  293. }
  294. // randomBlobRanges returns n blobRanges in random order
  295. func randomBlobRanges(n int) []blobRange {
  296. ranges := blobRanges(n)
  297. rand.Shuffle(len(ranges), func(i, j int) { ranges[i], ranges[j] = ranges[j], ranges[i] })
  298. return ranges
  299. }
  300. // blobRanges returns n blobRanges
  301. func blobRanges(n int) []blobRange {
  302. // We use three byte (24 bit) prefixes to get fairly granular ranges and easy bit
  303. // conversions.
  304. rangeSize := (1 << 24) / n
  305. ranges := make([]blobRange, 0, n)
  306. var prev []byte
  307. for i := range n {
  308. var pref []byte
  309. if i < n-1 {
  310. end := (i + 1) * rangeSize
  311. pref = intToBlob(end)
  312. }
  313. ranges = append(ranges, blobRange{prev, pref})
  314. prev = pref
  315. }
  316. return ranges
  317. }
  318. func intToBlob(n int) []byte {
  319. var pref [4]byte
  320. binary.BigEndian.PutUint32(pref[:], uint32(n)) //nolint:gosec
  321. // first byte is always zero and not part of the range
  322. return pref[1:]
  323. }