folderdb_update.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. // Copyright (C) 2025 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package sqlite
  7. import (
  8. "cmp"
  9. "context"
  10. "fmt"
  11. "log/slog"
  12. "slices"
  13. "github.com/jmoiron/sqlx"
  14. "github.com/syncthing/syncthing/internal/gen/dbproto"
  15. "github.com/syncthing/syncthing/internal/itererr"
  16. "github.com/syncthing/syncthing/internal/slogutil"
  17. "github.com/syncthing/syncthing/lib/osutil"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. "github.com/syncthing/syncthing/lib/sliceutil"
  20. "google.golang.org/protobuf/proto"
  21. )
  22. const (
  23. // Arbitrarily chosen values for checkpoint frequency....
  24. updatePointsPerFile = 100
  25. updatePointsPerBlock = 1
  26. updatePointsThreshold = 250_000
  27. )
  28. func (s *folderDB) Update(device protocol.DeviceID, fs []protocol.FileInfo) error {
  29. s.updateLock.Lock()
  30. defer s.updateLock.Unlock()
  31. deviceIdx, err := s.deviceIdxLocked(device)
  32. if err != nil {
  33. return wrap(err)
  34. }
  35. tx, err := s.sql.BeginTxx(context.Background(), nil)
  36. if err != nil {
  37. return wrap(err)
  38. }
  39. defer tx.Rollback() //nolint:errcheck
  40. txp := &txPreparedStmts{Tx: tx}
  41. //nolint:sqlclosecheck
  42. insertFileStmt, err := txp.Preparex(`
  43. INSERT OR REPLACE INTO files (device_idx, remote_sequence, name, type, modified, size, version, deleted, local_flags, blocklist_hash)
  44. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  45. RETURNING sequence
  46. `)
  47. if err != nil {
  48. return wrap(err, "prepare insert file")
  49. }
  50. //nolint:sqlclosecheck
  51. insertFileInfoStmt, err := txp.Preparex(`
  52. INSERT INTO fileinfos (sequence, fiprotobuf)
  53. VALUES (?, ?)
  54. `)
  55. if err != nil {
  56. return wrap(err, "prepare insert fileinfo")
  57. }
  58. //nolint:sqlclosecheck
  59. insertBlockListStmt, err := txp.Preparex(`
  60. INSERT OR IGNORE INTO blocklists (blocklist_hash, blprotobuf)
  61. VALUES (?, ?)
  62. `)
  63. if err != nil {
  64. return wrap(err, "prepare insert blocklist")
  65. }
  66. var prevRemoteSeq int64
  67. for i, f := range fs {
  68. f.Name = osutil.NormalizedFilename(f.Name)
  69. var blockshash *[]byte
  70. if len(f.Blocks) > 0 {
  71. f.BlocksHash = protocol.BlocksHash(f.Blocks)
  72. blockshash = &f.BlocksHash
  73. } else {
  74. f.BlocksHash = nil
  75. }
  76. if f.Type == protocol.FileInfoTypeDirectory {
  77. f.Size = 128 // synthetic directory size
  78. }
  79. // Insert the file.
  80. //
  81. // If it is a remote file, set remote_sequence otherwise leave it at
  82. // null. Returns the new local sequence.
  83. var remoteSeq *int64
  84. if device != protocol.LocalDeviceID {
  85. if i > 0 && f.Sequence == prevRemoteSeq {
  86. return fmt.Errorf("duplicate remote sequence number %d", prevRemoteSeq)
  87. }
  88. prevRemoteSeq = f.Sequence
  89. remoteSeq = &f.Sequence
  90. }
  91. var localSeq int64
  92. if err := insertFileStmt.Get(&localSeq, deviceIdx, remoteSeq, f.Name, f.Type, f.ModTime().UnixNano(), f.Size, f.Version.String(), f.IsDeleted(), f.LocalFlags, blockshash); err != nil {
  93. return wrap(err, "insert file")
  94. }
  95. if len(f.Blocks) > 0 {
  96. // Indirect the block list
  97. blocks := sliceutil.Map(f.Blocks, protocol.BlockInfo.ToWire)
  98. bs, err := proto.Marshal(&dbproto.BlockList{Blocks: blocks})
  99. if err != nil {
  100. return wrap(err, "marshal blocklist")
  101. }
  102. if res, err := insertBlockListStmt.Exec(f.BlocksHash, bs); err != nil {
  103. return wrap(err, "insert blocklist")
  104. } else if aff, _ := res.RowsAffected(); aff != 0 && device == protocol.LocalDeviceID {
  105. // Insert all blocks
  106. if err := s.insertBlocksLocked(txp, f.BlocksHash, f.Blocks); err != nil {
  107. return wrap(err, "insert blocks")
  108. }
  109. }
  110. f.Blocks = nil
  111. }
  112. // Insert the fileinfo
  113. if device == protocol.LocalDeviceID {
  114. f.Sequence = localSeq
  115. }
  116. bs, err := proto.Marshal(f.ToWire(true))
  117. if err != nil {
  118. return wrap(err, "marshal fileinfo")
  119. }
  120. if _, err := insertFileInfoStmt.Exec(localSeq, bs); err != nil {
  121. return wrap(err, "insert fileinfo")
  122. }
  123. // Update global and need
  124. if err := s.recalcGlobalForFile(txp, f.Name); err != nil {
  125. return wrap(err)
  126. }
  127. }
  128. if err := tx.Commit(); err != nil {
  129. return wrap(err)
  130. }
  131. s.periodicCheckpointLocked(fs)
  132. return nil
  133. }
  134. func (s *folderDB) DropDevice(device protocol.DeviceID) error {
  135. if device == protocol.LocalDeviceID {
  136. panic("bug: cannot drop local device")
  137. }
  138. s.updateLock.Lock()
  139. defer s.updateLock.Unlock()
  140. tx, err := s.sql.BeginTxx(context.Background(), nil)
  141. if err != nil {
  142. return wrap(err)
  143. }
  144. defer tx.Rollback() //nolint:errcheck
  145. txp := &txPreparedStmts{Tx: tx}
  146. // Drop the device, which cascades to delete all files etc for it
  147. if _, err := tx.Exec(`DELETE FROM devices WHERE device_id = ?`, device.String()); err != nil {
  148. return wrap(err)
  149. }
  150. // Recalc the globals for all affected folders
  151. if err := s.recalcGlobalForFolder(txp); err != nil {
  152. return wrap(err)
  153. }
  154. return wrap(tx.Commit())
  155. }
  156. func (s *folderDB) DropAllFiles(device protocol.DeviceID) error {
  157. s.updateLock.Lock()
  158. defer s.updateLock.Unlock()
  159. // This is a two part operation, first dropping all the files and then
  160. // recalculating the global state for the entire folder.
  161. deviceIdx, err := s.deviceIdxLocked(device)
  162. if err != nil {
  163. return wrap(err)
  164. }
  165. tx, err := s.sql.BeginTxx(context.Background(), nil)
  166. if err != nil {
  167. return wrap(err)
  168. }
  169. defer tx.Rollback() //nolint:errcheck
  170. txp := &txPreparedStmts{Tx: tx}
  171. // Drop all the file entries
  172. result, err := tx.Exec(`
  173. DELETE FROM files
  174. WHERE device_idx = ?
  175. `, deviceIdx)
  176. if err != nil {
  177. return wrap(err)
  178. }
  179. if n, err := result.RowsAffected(); err == nil && n == 0 {
  180. // The delete affected no rows, so we don't need to redo the entire
  181. // global/need calculation.
  182. return wrap(tx.Commit())
  183. }
  184. // Recalc global for the entire folder
  185. if err := s.recalcGlobalForFolder(txp); err != nil {
  186. return wrap(err)
  187. }
  188. return wrap(tx.Commit())
  189. }
  190. func (s *folderDB) DropFilesNamed(device protocol.DeviceID, names []string) error {
  191. for i := range names {
  192. names[i] = osutil.NormalizedFilename(names[i])
  193. }
  194. s.updateLock.Lock()
  195. defer s.updateLock.Unlock()
  196. deviceIdx, err := s.deviceIdxLocked(device)
  197. if err != nil {
  198. return wrap(err)
  199. }
  200. tx, err := s.sql.BeginTxx(context.Background(), nil)
  201. if err != nil {
  202. return wrap(err)
  203. }
  204. defer tx.Rollback() //nolint:errcheck
  205. txp := &txPreparedStmts{Tx: tx}
  206. // Drop the named files
  207. query, args, err := sqlx.In(`
  208. DELETE FROM files
  209. WHERE device_idx = ? AND name IN (?)
  210. `, deviceIdx, names)
  211. if err != nil {
  212. return wrap(err)
  213. }
  214. if _, err := tx.Exec(query, args...); err != nil {
  215. return wrap(err)
  216. }
  217. // Recalc globals for the named files
  218. for _, name := range names {
  219. if err := s.recalcGlobalForFile(txp, name); err != nil {
  220. return wrap(err)
  221. }
  222. }
  223. return wrap(tx.Commit())
  224. }
  225. func (*folderDB) insertBlocksLocked(tx *txPreparedStmts, blocklistHash []byte, blocks []protocol.BlockInfo) error {
  226. if len(blocks) == 0 {
  227. return nil
  228. }
  229. bs := make([]map[string]any, len(blocks))
  230. for i, b := range blocks {
  231. bs[i] = map[string]any{
  232. "hash": b.Hash,
  233. "blocklist_hash": blocklistHash,
  234. "idx": i,
  235. "offset": b.Offset,
  236. "size": b.Size,
  237. }
  238. }
  239. // Very large block lists (>8000 blocks) result in "too many variables"
  240. // error. Chunk it to a reasonable size.
  241. for chunk := range slices.Chunk(bs, 1000) {
  242. if _, err := tx.NamedExec(`
  243. INSERT OR IGNORE INTO blocks (hash, blocklist_hash, idx, offset, size)
  244. VALUES (:hash, :blocklist_hash, :idx, :offset, :size)
  245. `, chunk); err != nil {
  246. return wrap(err)
  247. }
  248. }
  249. return nil
  250. }
  251. func (s *folderDB) recalcGlobalForFolder(txp *txPreparedStmts) error {
  252. // Select files where there is no global, those are the ones we need to
  253. // recalculate.
  254. //nolint:sqlclosecheck
  255. namesStmt, err := txp.Preparex(`
  256. SELECT f.name FROM files f
  257. WHERE NOT EXISTS (
  258. SELECT 1 FROM files g
  259. WHERE g.name = f.name AND g.local_flags & ? != 0
  260. )
  261. GROUP BY name
  262. `)
  263. if err != nil {
  264. return wrap(err)
  265. }
  266. rows, err := namesStmt.Queryx(protocol.FlagLocalGlobal)
  267. if err != nil {
  268. return wrap(err)
  269. }
  270. defer rows.Close()
  271. for rows.Next() {
  272. var name string
  273. if err := rows.Scan(&name); err != nil {
  274. return wrap(err)
  275. }
  276. if err := s.recalcGlobalForFile(txp, name); err != nil {
  277. return wrap(err)
  278. }
  279. }
  280. return wrap(rows.Err())
  281. }
  282. func (s *folderDB) recalcGlobalForFile(txp *txPreparedStmts, file string) error {
  283. //nolint:sqlclosecheck
  284. selStmt, err := txp.Preparex(`
  285. SELECT name, device_idx, sequence, modified, version, deleted, local_flags FROM files
  286. WHERE name = ?
  287. `)
  288. if err != nil {
  289. return wrap(err)
  290. }
  291. es, err := itererr.Collect(iterStructs[fileRow](selStmt.Queryx(file)))
  292. if err != nil {
  293. return wrap(err)
  294. }
  295. if len(es) == 0 {
  296. // shouldn't happen
  297. return nil
  298. }
  299. // Sort the entries; the global entry is at the head of the list
  300. slices.SortFunc(es, fileRow.Compare)
  301. // The global version is the first one in the list that is not invalid,
  302. // or just the first one in the list if all are invalid.
  303. var global fileRow
  304. globIdx := slices.IndexFunc(es, func(e fileRow) bool { return !e.IsInvalid() })
  305. if globIdx < 0 {
  306. globIdx = 0
  307. }
  308. global = es[globIdx]
  309. // We "have" the file if the position in the list of versions is at the
  310. // global version or better, or if the version is the same as the global
  311. // file (we might be further down the list due to invalid flags), or if
  312. // the global is deleted and we don't have it at all...
  313. localIdx := slices.IndexFunc(es, func(e fileRow) bool { return e.DeviceIdx == s.localDeviceIdx })
  314. hasLocal := localIdx >= 0 && localIdx <= globIdx || // have a better or equal version
  315. localIdx >= 0 && es[localIdx].Version.Equal(global.Version.Vector) || // have an equal version but invalid/ignored
  316. localIdx < 0 && global.Deleted // missing it, but the global is also deleted
  317. // Set the global flag on the global entry. Set the need flag if the
  318. // local device needs this file, unless it's invalid.
  319. global.LocalFlags |= protocol.FlagLocalGlobal
  320. if hasLocal || global.IsInvalid() {
  321. global.LocalFlags &= ^protocol.FlagLocalNeeded
  322. } else {
  323. global.LocalFlags |= protocol.FlagLocalNeeded
  324. }
  325. //nolint:sqlclosecheck
  326. upStmt, err := txp.Preparex(`
  327. UPDATE files SET local_flags = ?
  328. WHERE device_idx = ? AND sequence = ?
  329. `)
  330. if err != nil {
  331. return wrap(err)
  332. }
  333. if _, err := upStmt.Exec(global.LocalFlags, global.DeviceIdx, global.Sequence); err != nil {
  334. return wrap(err)
  335. }
  336. // Clear the need and global flags on all other entries
  337. //nolint:sqlclosecheck
  338. upStmt, err = txp.Preparex(`
  339. UPDATE files SET local_flags = local_flags & ?
  340. WHERE name = ? AND sequence != ? AND local_flags & ? != 0
  341. `)
  342. if err != nil {
  343. return wrap(err)
  344. }
  345. if _, err := upStmt.Exec(^(protocol.FlagLocalNeeded | protocol.FlagLocalGlobal), global.Name, global.Sequence, protocol.FlagLocalNeeded|protocol.FlagLocalGlobal); err != nil {
  346. return wrap(err)
  347. }
  348. return nil
  349. }
  350. func (s *DB) folderIdxLocked(folderID string) (int64, error) {
  351. if _, err := s.stmt(`
  352. INSERT OR IGNORE INTO folders(folder_id)
  353. VALUES (?)
  354. `).Exec(folderID); err != nil {
  355. return 0, wrap(err)
  356. }
  357. var idx int64
  358. if err := s.stmt(`
  359. SELECT idx FROM folders
  360. WHERE folder_id = ?
  361. `).Get(&idx, folderID); err != nil {
  362. return 0, wrap(err)
  363. }
  364. return idx, nil
  365. }
  366. type fileRow struct {
  367. Name string
  368. Version dbVector
  369. DeviceIdx int64 `db:"device_idx"`
  370. Sequence int64
  371. Modified int64
  372. Size int64
  373. LocalFlags protocol.FlagLocal `db:"local_flags"`
  374. Deleted bool
  375. }
  376. func (e fileRow) Compare(other fileRow) int {
  377. // From FileInfo.WinsConflict
  378. vc := e.Version.Compare(other.Version.Vector)
  379. switch vc {
  380. case protocol.Equal:
  381. if e.IsInvalid() != other.IsInvalid() {
  382. if e.IsInvalid() {
  383. return 1
  384. }
  385. return -1
  386. }
  387. // Compare the device ID index, lower is better. This is only
  388. // deterministic to the extent that LocalDeviceID will always be the
  389. // lowest one, order between remote devices is random (and
  390. // irrelevant).
  391. return cmp.Compare(e.DeviceIdx, other.DeviceIdx)
  392. case protocol.Greater: // we are newer
  393. return -1
  394. case protocol.Lesser: // we are older
  395. return 1
  396. case protocol.ConcurrentGreater, protocol.ConcurrentLesser: // there is a conflict
  397. if e.IsInvalid() != other.IsInvalid() {
  398. if e.IsInvalid() { // we are invalid, we lose
  399. return 1
  400. }
  401. return -1 // they are invalid, we win
  402. }
  403. if d := cmp.Compare(e.Modified, other.Modified); d != 0 {
  404. return -d // positive d means we were newer, so we win (negative return)
  405. }
  406. if vc == protocol.ConcurrentGreater {
  407. return -1 // we have a better device ID, we win
  408. }
  409. return 1 // they win
  410. default:
  411. return 0
  412. }
  413. }
  414. func (e fileRow) IsInvalid() bool {
  415. return e.LocalFlags.IsInvalid()
  416. }
  417. func (s *folderDB) periodicCheckpointLocked(fs []protocol.FileInfo) {
  418. // Induce periodic checkpoints. We add points for each file and block,
  419. // and checkpoint when we've written more than a threshold of points.
  420. // This ensures we do not go too long without a checkpoint, while also
  421. // not doing it incessantly for every update.
  422. s.updatePoints += updatePointsPerFile * len(fs)
  423. for _, f := range fs {
  424. s.updatePoints += len(f.Blocks) * updatePointsPerBlock
  425. }
  426. if s.updatePoints > updatePointsThreshold {
  427. conn, err := s.sql.Conn(context.Background())
  428. if err != nil {
  429. slog.Debug("Connection error", slog.String("db", s.baseName), slogutil.Error(err))
  430. return
  431. }
  432. defer conn.Close()
  433. if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 8388608`); err != nil {
  434. slog.Debug("PRAGMA journal_size_limit error", slog.String("db", s.baseName), slogutil.Error(err))
  435. }
  436. // Every 50th checkpoint becomes a truncate, in an effort to bring
  437. // down the size now and then.
  438. checkpointType := "RESTART"
  439. if s.checkpointsCount > 50 {
  440. checkpointType = "TRUNCATE"
  441. }
  442. cmd := fmt.Sprintf(`PRAGMA wal_checkpoint(%s)`, checkpointType)
  443. row := conn.QueryRowContext(context.Background(), cmd)
  444. var res, modified, moved int
  445. if row.Err() != nil {
  446. slog.Debug("Command error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
  447. } else if err := row.Scan(&res, &modified, &moved); err != nil {
  448. slog.Debug("Command scan error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
  449. } else {
  450. slog.Debug("Checkpoint result", "db", s.baseName, "checkpointscount", s.checkpointsCount, "updatepoints", s.updatePoints, "res", res, "modified", modified, "moved", moved)
  451. }
  452. // Reset the truncate counter when a truncate succeeded. If it
  453. // failed, we'll keep trying it until we succeed. Increase it faster
  454. // when we fail to checkpoint, as it's more likely the WAL is
  455. // growing and will need truncation when we get out of this state.
  456. switch {
  457. case res == 1:
  458. s.checkpointsCount += 10
  459. case res == 0 && checkpointType == "TRUNCATE":
  460. s.checkpointsCount = 0
  461. default:
  462. s.checkpointsCount++
  463. }
  464. s.updatePoints = 0
  465. }
  466. }