db_service.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // Copyright (C) 2025 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package sqlite
  7. import (
  8. "context"
  9. "fmt"
  10. "log/slog"
  11. "time"
  12. "github.com/jmoiron/sqlx"
  13. "github.com/syncthing/syncthing/internal/db"
  14. "github.com/syncthing/syncthing/internal/slogutil"
  15. "github.com/thejerf/suture/v4"
  16. )
  17. const (
  18. internalMetaPrefix = "dbsvc"
  19. lastMaintKey = "lastMaint"
  20. defaultDeleteRetention = 180 * 24 * time.Hour
  21. minDeleteRetention = 24 * time.Hour
  22. )
  23. func (s *DB) Service(maintenanceInterval time.Duration) suture.Service {
  24. return newService(s, maintenanceInterval)
  25. }
  26. type Service struct {
  27. sdb *DB
  28. maintenanceInterval time.Duration
  29. internalMeta *db.Typed
  30. }
  31. func (s *Service) String() string {
  32. return fmt.Sprintf("sqlite.service@%p", s)
  33. }
  34. func newService(sdb *DB, maintenanceInterval time.Duration) *Service {
  35. return &Service{
  36. sdb: sdb,
  37. maintenanceInterval: maintenanceInterval,
  38. internalMeta: db.NewTyped(sdb, internalMetaPrefix),
  39. }
  40. }
  41. func (s *Service) Serve(ctx context.Context) error {
  42. // Run periodic maintenance
  43. // Figure out when we last ran maintenance and schedule accordingly. If
  44. // it was never, do it now.
  45. lastMaint, _, _ := s.internalMeta.Time(lastMaintKey)
  46. nextMaint := lastMaint.Add(s.maintenanceInterval)
  47. wait := time.Until(nextMaint)
  48. if wait < 0 {
  49. wait = time.Minute
  50. }
  51. slog.DebugContext(ctx, "Next periodic run due", "after", wait)
  52. timer := time.NewTimer(wait)
  53. for {
  54. select {
  55. case <-ctx.Done():
  56. return ctx.Err()
  57. case <-timer.C:
  58. }
  59. if err := s.periodic(ctx); err != nil {
  60. return wrap(err)
  61. }
  62. timer.Reset(s.maintenanceInterval)
  63. slog.DebugContext(ctx, "Next periodic run due", "after", s.maintenanceInterval)
  64. _ = s.internalMeta.PutTime(lastMaintKey, time.Now())
  65. }
  66. }
  67. func (s *Service) periodic(ctx context.Context) error {
  68. t0 := time.Now()
  69. slog.DebugContext(ctx, "Periodic start")
  70. t1 := time.Now()
  71. defer func() { slog.DebugContext(ctx, "Periodic done in", "t1", time.Since(t1), "t0t1", t1.Sub(t0)) }()
  72. s.sdb.updateLock.Lock()
  73. err := tidy(ctx, s.sdb.sql)
  74. s.sdb.updateLock.Unlock()
  75. if err != nil {
  76. return err
  77. }
  78. return wrap(s.sdb.forEachFolder(func(fdb *folderDB) error {
  79. fdb.updateLock.Lock()
  80. defer fdb.updateLock.Unlock()
  81. if err := garbageCollectOldDeletedLocked(ctx, fdb); err != nil {
  82. return wrap(err)
  83. }
  84. if err := garbageCollectBlocklistsAndBlocksLocked(ctx, fdb); err != nil {
  85. return wrap(err)
  86. }
  87. return tidy(ctx, fdb.sql)
  88. }))
  89. }
  90. func tidy(ctx context.Context, db *sqlx.DB) error {
  91. conn, err := db.Conn(ctx)
  92. if err != nil {
  93. return wrap(err)
  94. }
  95. defer conn.Close()
  96. _, _ = conn.ExecContext(ctx, `ANALYZE`)
  97. _, _ = conn.ExecContext(ctx, `PRAGMA optimize`)
  98. _, _ = conn.ExecContext(ctx, `PRAGMA incremental_vacuum`)
  99. _, _ = conn.ExecContext(ctx, `PRAGMA journal_size_limit = 8388608`)
  100. _, _ = conn.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`)
  101. return nil
  102. }
  103. func garbageCollectOldDeletedLocked(ctx context.Context, fdb *folderDB) error {
  104. l := slog.With("fdb", fdb.baseDB)
  105. if fdb.deleteRetention <= 0 {
  106. slog.DebugContext(ctx, "Delete retention is infinite, skipping cleanup")
  107. return nil
  108. }
  109. // Remove deleted files that are marked as not needed (we have processed
  110. // them) and they were deleted more than MaxDeletedFileAge ago.
  111. l.DebugContext(ctx, "Forgetting deleted files", "retention", fdb.deleteRetention)
  112. res, err := fdb.stmt(`
  113. DELETE FROM files
  114. WHERE deleted AND modified < ? AND local_flags & {{.FlagLocalNeeded}} == 0
  115. `).Exec(time.Now().Add(-fdb.deleteRetention).UnixNano())
  116. if err != nil {
  117. return wrap(err)
  118. }
  119. if aff, err := res.RowsAffected(); err == nil {
  120. l.DebugContext(ctx, "Removed old deleted file records", "affected", aff)
  121. }
  122. return nil
  123. }
  124. func garbageCollectBlocklistsAndBlocksLocked(ctx context.Context, fdb *folderDB) error {
  125. // Remove all blocklists not referred to by any files and, by extension,
  126. // any blocks not referred to by a blocklist. This is an expensive
  127. // operation when run normally, especially if there are a lot of blocks
  128. // to collect.
  129. //
  130. // We make this orders of magnitude faster by disabling foreign keys for
  131. // the transaction and doing the cleanup manually. This requires using
  132. // an explicit connection and disabling foreign keys before starting the
  133. // transaction. We make sure to clean up on the way out.
  134. conn, err := fdb.sql.Connx(ctx)
  135. if err != nil {
  136. return wrap(err)
  137. }
  138. defer conn.Close()
  139. if _, err := conn.ExecContext(ctx, `PRAGMA foreign_keys = 0`); err != nil {
  140. return wrap(err)
  141. }
  142. defer func() { //nolint:contextcheck
  143. _, _ = conn.ExecContext(context.Background(), `PRAGMA foreign_keys = 1`)
  144. }()
  145. tx, err := conn.BeginTxx(ctx, nil)
  146. if err != nil {
  147. return wrap(err)
  148. }
  149. defer tx.Rollback() //nolint:errcheck
  150. if res, err := tx.ExecContext(ctx, `
  151. DELETE FROM blocklists
  152. WHERE NOT EXISTS (
  153. SELECT 1 FROM files WHERE files.blocklist_hash = blocklists.blocklist_hash
  154. )`); err != nil {
  155. return wrap(err, "delete blocklists")
  156. } else {
  157. slog.DebugContext(ctx, "Blocklist GC", "fdb", fdb.baseName, "result", slogutil.Expensive(func() any {
  158. rows, err := res.RowsAffected()
  159. if err != nil {
  160. return slogutil.Error(err)
  161. }
  162. return slog.Int64("rows", rows)
  163. }))
  164. }
  165. if res, err := tx.ExecContext(ctx, `
  166. DELETE FROM blocks
  167. WHERE NOT EXISTS (
  168. SELECT 1 FROM blocklists WHERE blocklists.blocklist_hash = blocks.blocklist_hash
  169. )`); err != nil {
  170. return wrap(err, "delete blocks")
  171. } else {
  172. slog.DebugContext(ctx, "Blocks GC", "fdb", fdb.baseName, "result", slogutil.Expensive(func() any {
  173. rows, err := res.RowsAffected()
  174. if err != nil {
  175. return slogutil.Error(err)
  176. }
  177. return slog.Int64("rows", rows)
  178. }))
  179. }
  180. return wrap(tx.Commit())
  181. }