folderdb_update.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. // Copyright (C) 2025 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package sqlite
  7. import (
  8. "cmp"
  9. "context"
  10. "fmt"
  11. "log/slog"
  12. "slices"
  13. "github.com/jmoiron/sqlx"
  14. "github.com/syncthing/syncthing/internal/gen/dbproto"
  15. "github.com/syncthing/syncthing/internal/itererr"
  16. "github.com/syncthing/syncthing/internal/slogutil"
  17. "github.com/syncthing/syncthing/lib/osutil"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. "github.com/syncthing/syncthing/lib/sliceutil"
  20. "google.golang.org/protobuf/proto"
  21. )
  22. const (
  23. // Arbitrarily chosen values for checkpoint frequency....
  24. updatePointsPerFile = 100
  25. updatePointsPerBlock = 1
  26. updatePointsThreshold = 250_000
  27. )
  28. func (s *folderDB) Update(device protocol.DeviceID, fs []protocol.FileInfo) error {
  29. s.updateLock.Lock()
  30. defer s.updateLock.Unlock()
  31. deviceIdx, err := s.deviceIdxLocked(device)
  32. if err != nil {
  33. return wrap(err)
  34. }
  35. tx, err := s.sql.BeginTxx(context.Background(), nil)
  36. if err != nil {
  37. return wrap(err)
  38. }
  39. defer tx.Rollback() //nolint:errcheck
  40. txp := &txPreparedStmts{Tx: tx}
  41. //nolint:sqlclosecheck
  42. insertNameStmt, err := txp.Preparex(`
  43. INSERT INTO file_names(name)
  44. VALUES (?)
  45. ON CONFLICT(name) DO UPDATE
  46. SET name = excluded.name
  47. RETURNING idx
  48. `)
  49. if err != nil {
  50. return wrap(err, "prepare insert name")
  51. }
  52. //nolint:sqlclosecheck
  53. insertVersionStmt, err := txp.Preparex(`
  54. INSERT INTO file_versions (version)
  55. VALUES (?)
  56. ON CONFLICT(version) DO UPDATE
  57. SET version = excluded.version
  58. RETURNING idx
  59. `)
  60. if err != nil {
  61. return wrap(err, "prepare insert version")
  62. }
  63. //nolint:sqlclosecheck
  64. insertFileStmt, err := txp.Preparex(`
  65. INSERT OR REPLACE INTO files (device_idx, remote_sequence, type, modified, size, deleted, local_flags, blocklist_hash, name_idx, version_idx)
  66. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  67. RETURNING sequence
  68. `)
  69. if err != nil {
  70. return wrap(err, "prepare insert file")
  71. }
  72. //nolint:sqlclosecheck
  73. insertFileInfoStmt, err := txp.Preparex(`
  74. INSERT INTO fileinfos (sequence, fiprotobuf)
  75. VALUES (?, ?)
  76. `)
  77. if err != nil {
  78. return wrap(err, "prepare insert fileinfo")
  79. }
  80. //nolint:sqlclosecheck
  81. insertBlockListStmt, err := txp.Preparex(`
  82. INSERT OR IGNORE INTO blocklists (blocklist_hash, blprotobuf)
  83. VALUES (?, ?)
  84. `)
  85. if err != nil {
  86. return wrap(err, "prepare insert blocklist")
  87. }
  88. var prevRemoteSeq int64
  89. for i, f := range fs {
  90. f.Name = osutil.NormalizedFilename(f.Name)
  91. var blockshash *[]byte
  92. if len(f.Blocks) > 0 {
  93. f.BlocksHash = protocol.BlocksHash(f.Blocks)
  94. blockshash = &f.BlocksHash
  95. } else {
  96. f.BlocksHash = nil
  97. }
  98. if f.Type == protocol.FileInfoTypeDirectory {
  99. f.Size = 128 // synthetic directory size
  100. }
  101. // Insert the file.
  102. //
  103. // If it is a remote file, set remote_sequence otherwise leave it at
  104. // null. Returns the new local sequence.
  105. var remoteSeq *int64
  106. if device != protocol.LocalDeviceID {
  107. if i > 0 && f.Sequence == prevRemoteSeq {
  108. return fmt.Errorf("duplicate remote sequence number %d", prevRemoteSeq)
  109. }
  110. prevRemoteSeq = f.Sequence
  111. remoteSeq = &f.Sequence
  112. }
  113. var nameIdx int64
  114. if err := insertNameStmt.Get(&nameIdx, f.Name); err != nil {
  115. return wrap(err, "insert name")
  116. }
  117. var versionIdx int64
  118. if err := insertVersionStmt.Get(&versionIdx, f.Version.String()); err != nil {
  119. return wrap(err, "insert version")
  120. }
  121. var localSeq int64
  122. if err := insertFileStmt.Get(&localSeq, deviceIdx, remoteSeq, f.Type, f.ModTime().UnixNano(), f.Size, f.IsDeleted(), f.LocalFlags, blockshash, nameIdx, versionIdx); err != nil {
  123. return wrap(err, "insert file")
  124. }
  125. if len(f.Blocks) > 0 {
  126. // Indirect the block list
  127. blocks := sliceutil.Map(f.Blocks, protocol.BlockInfo.ToWire)
  128. bs, err := proto.Marshal(&dbproto.BlockList{Blocks: blocks})
  129. if err != nil {
  130. return wrap(err, "marshal blocklist")
  131. }
  132. if _, err := insertBlockListStmt.Exec(f.BlocksHash, bs); err != nil {
  133. return wrap(err, "insert blocklist")
  134. } else if device == protocol.LocalDeviceID {
  135. // Insert all blocks
  136. if err := s.insertBlocksLocked(txp, f.BlocksHash, f.Blocks); err != nil {
  137. return wrap(err, "insert blocks")
  138. }
  139. }
  140. f.Blocks = nil
  141. }
  142. // Insert the fileinfo
  143. if device == protocol.LocalDeviceID {
  144. f.Sequence = localSeq
  145. }
  146. bs, err := proto.Marshal(f.ToWire(true))
  147. if err != nil {
  148. return wrap(err, "marshal fileinfo")
  149. }
  150. if _, err := insertFileInfoStmt.Exec(localSeq, bs); err != nil {
  151. return wrap(err, "insert fileinfo")
  152. }
  153. // Update global and need
  154. if err := s.recalcGlobalForFile(txp, f.Name); err != nil {
  155. return wrap(err)
  156. }
  157. }
  158. if err := tx.Commit(); err != nil {
  159. return wrap(err)
  160. }
  161. s.periodicCheckpointLocked(fs)
  162. return nil
  163. }
  164. func (s *folderDB) DropDevice(device protocol.DeviceID) error {
  165. if device == protocol.LocalDeviceID {
  166. panic("bug: cannot drop local device")
  167. }
  168. s.updateLock.Lock()
  169. defer s.updateLock.Unlock()
  170. tx, err := s.sql.BeginTxx(context.Background(), nil)
  171. if err != nil {
  172. return wrap(err)
  173. }
  174. defer tx.Rollback() //nolint:errcheck
  175. txp := &txPreparedStmts{Tx: tx}
  176. // Drop the device, which cascades to delete all files etc for it
  177. if _, err := tx.Exec(`DELETE FROM devices WHERE device_id = ?`, device.String()); err != nil {
  178. return wrap(err)
  179. }
  180. // Recalc the globals for all affected folders
  181. if err := s.recalcGlobalForFolder(txp); err != nil {
  182. return wrap(err)
  183. }
  184. return wrap(tx.Commit())
  185. }
  186. func (s *folderDB) DropAllFiles(device protocol.DeviceID) error {
  187. s.updateLock.Lock()
  188. defer s.updateLock.Unlock()
  189. // This is a two part operation, first dropping all the files and then
  190. // recalculating the global state for the entire folder.
  191. deviceIdx, err := s.deviceIdxLocked(device)
  192. if err != nil {
  193. return wrap(err)
  194. }
  195. tx, err := s.sql.BeginTxx(context.Background(), nil)
  196. if err != nil {
  197. return wrap(err)
  198. }
  199. defer tx.Rollback() //nolint:errcheck
  200. txp := &txPreparedStmts{Tx: tx}
  201. // Drop all the file entries
  202. result, err := tx.Exec(`
  203. DELETE FROM files
  204. WHERE device_idx = ?
  205. `, deviceIdx)
  206. if err != nil {
  207. return wrap(err)
  208. }
  209. if n, err := result.RowsAffected(); err == nil && n == 0 {
  210. // The delete affected no rows, so we don't need to redo the entire
  211. // global/need calculation.
  212. return wrap(tx.Commit())
  213. }
  214. // Recalc global for the entire folder
  215. if err := s.recalcGlobalForFolder(txp); err != nil {
  216. return wrap(err)
  217. }
  218. return wrap(tx.Commit())
  219. }
  220. func (s *folderDB) DropFilesNamed(device protocol.DeviceID, names []string) error {
  221. for i := range names {
  222. names[i] = osutil.NormalizedFilename(names[i])
  223. }
  224. s.updateLock.Lock()
  225. defer s.updateLock.Unlock()
  226. deviceIdx, err := s.deviceIdxLocked(device)
  227. if err != nil {
  228. return wrap(err)
  229. }
  230. tx, err := s.sql.BeginTxx(context.Background(), nil)
  231. if err != nil {
  232. return wrap(err)
  233. }
  234. defer tx.Rollback() //nolint:errcheck
  235. txp := &txPreparedStmts{Tx: tx}
  236. // Drop the named files
  237. query, args, err := sqlx.In(`
  238. DELETE FROM files
  239. WHERE device_idx = ? AND name_idx IN (
  240. SELECT idx FROM file_names WHERE name IN (?)
  241. )
  242. `, deviceIdx, names)
  243. if err != nil {
  244. return wrap(err)
  245. }
  246. if _, err := tx.Exec(query, args...); err != nil {
  247. return wrap(err)
  248. }
  249. // Recalc globals for the named files
  250. for _, name := range names {
  251. if err := s.recalcGlobalForFile(txp, name); err != nil {
  252. return wrap(err)
  253. }
  254. }
  255. return wrap(tx.Commit())
  256. }
  257. func (*folderDB) insertBlocksLocked(tx *txPreparedStmts, blocklistHash []byte, blocks []protocol.BlockInfo) error {
  258. if len(blocks) == 0 {
  259. return nil
  260. }
  261. bs := make([]map[string]any, len(blocks))
  262. for i, b := range blocks {
  263. bs[i] = map[string]any{
  264. "hash": b.Hash,
  265. "blocklist_hash": blocklistHash,
  266. "idx": i,
  267. "offset": b.Offset,
  268. "size": b.Size,
  269. }
  270. }
  271. // Very large block lists (>8000 blocks) result in "too many variables"
  272. // error. Chunk it to a reasonable size.
  273. for chunk := range slices.Chunk(bs, 1000) {
  274. if _, err := tx.NamedExec(`
  275. INSERT OR IGNORE INTO blocks (hash, blocklist_hash, idx, offset, size)
  276. VALUES (:hash, :blocklist_hash, :idx, :offset, :size)
  277. `, chunk); err != nil {
  278. return wrap(err)
  279. }
  280. }
  281. return nil
  282. }
  283. func (s *folderDB) recalcGlobalForFolder(txp *txPreparedStmts) error {
  284. // Select files where there is no global, those are the ones we need to
  285. // recalculate.
  286. //nolint:sqlclosecheck
  287. namesStmt, err := txp.Preparex(`
  288. SELECT n.name FROM files f
  289. INNER JOIN file_names n ON n.idx = f.name_idx
  290. WHERE NOT EXISTS (
  291. SELECT 1 FROM files g
  292. WHERE g.name_idx = f.name_idx AND g.local_flags & ? != 0
  293. )
  294. GROUP BY n.name
  295. `)
  296. if err != nil {
  297. return wrap(err)
  298. }
  299. rows, err := namesStmt.Queryx(protocol.FlagLocalGlobal)
  300. if err != nil {
  301. return wrap(err)
  302. }
  303. defer rows.Close()
  304. for rows.Next() {
  305. var name string
  306. if err := rows.Scan(&name); err != nil {
  307. return wrap(err)
  308. }
  309. if err := s.recalcGlobalForFile(txp, name); err != nil {
  310. return wrap(err)
  311. }
  312. }
  313. return wrap(rows.Err())
  314. }
  315. func (s *folderDB) recalcGlobalForFile(txp *txPreparedStmts, file string) error {
  316. //nolint:sqlclosecheck
  317. selStmt, err := txp.Preparex(`
  318. SELECT n.name, f.device_idx, f.sequence, f.modified, v.version, f.deleted, f.local_flags FROM files f
  319. INNER JOIN file_versions v ON v.idx = f.version_idx
  320. INNER JOIN file_names n ON n.idx = f.name_idx
  321. WHERE n.name = ?
  322. `)
  323. if err != nil {
  324. return wrap(err, "prepare select")
  325. }
  326. es, err := itererr.Collect(iterStructs[fileRow](selStmt.Queryx(file)))
  327. if err != nil {
  328. return wrap(err)
  329. }
  330. if len(es) == 0 {
  331. // shouldn't happen
  332. return nil
  333. }
  334. // Sort the entries; the global entry is at the head of the list
  335. slices.SortFunc(es, fileRow.Compare)
  336. // The global version is the first one in the list that is not invalid,
  337. // or just the first one in the list if all are invalid.
  338. var global fileRow
  339. globIdx := slices.IndexFunc(es, func(e fileRow) bool { return !e.IsInvalid() })
  340. if globIdx < 0 {
  341. globIdx = 0
  342. }
  343. global = es[globIdx]
  344. // We "have" the file if the position in the list of versions is at the
  345. // global version or better, or if the version is the same as the global
  346. // file (we might be further down the list due to invalid flags), or if
  347. // the global is deleted and we don't have it at all...
  348. localIdx := slices.IndexFunc(es, func(e fileRow) bool { return e.DeviceIdx == s.localDeviceIdx })
  349. hasLocal := localIdx >= 0 && localIdx <= globIdx || // have a better or equal version
  350. localIdx >= 0 && es[localIdx].Version.Equal(global.Version.Vector) || // have an equal version but invalid/ignored
  351. localIdx < 0 && global.Deleted // missing it, but the global is also deleted
  352. // Set the global flag on the global entry. Set the need flag if the
  353. // local device needs this file, unless it's invalid.
  354. global.LocalFlags |= protocol.FlagLocalGlobal
  355. if hasLocal || global.IsInvalid() {
  356. global.LocalFlags &= ^protocol.FlagLocalNeeded
  357. } else {
  358. global.LocalFlags |= protocol.FlagLocalNeeded
  359. }
  360. //nolint:sqlclosecheck
  361. upStmt, err := txp.Preparex(`
  362. UPDATE files SET local_flags = ?
  363. WHERE device_idx = ? AND sequence = ?
  364. `)
  365. if err != nil {
  366. return wrap(err)
  367. }
  368. if _, err := upStmt.Exec(global.LocalFlags, global.DeviceIdx, global.Sequence); err != nil {
  369. return wrap(err)
  370. }
  371. // Clear the need and global flags on all other entries
  372. //nolint:sqlclosecheck
  373. upStmt, err = txp.Preparex(`
  374. UPDATE files SET local_flags = local_flags & ?
  375. WHERE name_idx = (SELECT idx FROM file_names WHERE name = ?) AND sequence != ? AND local_flags & ? != 0
  376. `)
  377. if err != nil {
  378. return wrap(err, "prepare update")
  379. }
  380. if _, err := upStmt.Exec(^(protocol.FlagLocalNeeded | protocol.FlagLocalGlobal), global.Name, global.Sequence, protocol.FlagLocalNeeded|protocol.FlagLocalGlobal); err != nil {
  381. return wrap(err)
  382. }
  383. return nil
  384. }
  385. func (s *DB) folderIdxLocked(folderID string) (int64, error) {
  386. if _, err := s.stmt(`
  387. INSERT OR IGNORE INTO folders(folder_id)
  388. VALUES (?)
  389. `).Exec(folderID); err != nil {
  390. return 0, wrap(err)
  391. }
  392. var idx int64
  393. if err := s.stmt(`
  394. SELECT idx FROM folders
  395. WHERE folder_id = ?
  396. `).Get(&idx, folderID); err != nil {
  397. return 0, wrap(err)
  398. }
  399. return idx, nil
  400. }
  401. type fileRow struct {
  402. Name string
  403. Version dbVector
  404. DeviceIdx int64 `db:"device_idx"`
  405. Sequence int64
  406. Modified int64
  407. Size int64
  408. LocalFlags protocol.FlagLocal `db:"local_flags"`
  409. Deleted bool
  410. }
  411. func (e fileRow) Compare(other fileRow) int {
  412. // From FileInfo.WinsConflict
  413. vc := e.Version.Compare(other.Version.Vector)
  414. switch vc {
  415. case protocol.Equal:
  416. if e.IsInvalid() != other.IsInvalid() {
  417. if e.IsInvalid() {
  418. return 1
  419. }
  420. return -1
  421. }
  422. // Compare the device ID index, lower is better. This is only
  423. // deterministic to the extent that LocalDeviceID will always be the
  424. // lowest one, order between remote devices is random (and
  425. // irrelevant).
  426. return cmp.Compare(e.DeviceIdx, other.DeviceIdx)
  427. case protocol.Greater: // we are newer
  428. return -1
  429. case protocol.Lesser: // we are older
  430. return 1
  431. case protocol.ConcurrentGreater, protocol.ConcurrentLesser: // there is a conflict
  432. if e.IsInvalid() != other.IsInvalid() {
  433. if e.IsInvalid() { // we are invalid, we lose
  434. return 1
  435. }
  436. return -1 // they are invalid, we win
  437. }
  438. if d := cmp.Compare(e.Modified, other.Modified); d != 0 {
  439. return -d // positive d means we were newer, so we win (negative return)
  440. }
  441. if vc == protocol.ConcurrentGreater {
  442. return -1 // we have a better device ID, we win
  443. }
  444. return 1 // they win
  445. default:
  446. return 0
  447. }
  448. }
  449. func (e fileRow) IsInvalid() bool {
  450. return e.LocalFlags.IsInvalid()
  451. }
  452. func (s *folderDB) periodicCheckpointLocked(fs []protocol.FileInfo) {
  453. // Induce periodic checkpoints. We add points for each file and block,
  454. // and checkpoint when we've written more than a threshold of points.
  455. // This ensures we do not go too long without a checkpoint, while also
  456. // not doing it incessantly for every update.
  457. s.updatePoints += updatePointsPerFile * len(fs)
  458. for _, f := range fs {
  459. s.updatePoints += len(f.Blocks) * updatePointsPerBlock
  460. }
  461. if s.updatePoints > updatePointsThreshold {
  462. conn, err := s.sql.Conn(context.Background())
  463. if err != nil {
  464. slog.Debug("Connection error", slog.String("db", s.baseName), slogutil.Error(err))
  465. return
  466. }
  467. defer conn.Close()
  468. if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 8388608`); err != nil {
  469. slog.Debug("PRAGMA journal_size_limit error", slog.String("db", s.baseName), slogutil.Error(err))
  470. }
  471. // Every 50th checkpoint becomes a truncate, in an effort to bring
  472. // down the size now and then.
  473. checkpointType := "RESTART"
  474. if s.checkpointsCount > 50 {
  475. checkpointType = "TRUNCATE"
  476. }
  477. cmd := fmt.Sprintf(`PRAGMA wal_checkpoint(%s)`, checkpointType)
  478. row := conn.QueryRowContext(context.Background(), cmd)
  479. var res, modified, moved int
  480. if row.Err() != nil {
  481. slog.Debug("Command error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
  482. } else if err := row.Scan(&res, &modified, &moved); err != nil {
  483. slog.Debug("Command scan error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
  484. } else {
  485. slog.Debug("Checkpoint result", "db", s.baseName, "checkpointscount", s.checkpointsCount, "updatepoints", s.updatePoints, "res", res, "modified", modified, "moved", moved)
  486. }
  487. // Reset the truncate counter when a truncate succeeded. If it
  488. // failed, we'll keep trying it until we succeed. Increase it faster
  489. // when we fail to checkpoint, as it's more likely the WAL is
  490. // growing and will need truncation when we get out of this state.
  491. switch {
  492. case res == 1:
  493. s.checkpointsCount += 10
  494. case res == 0 && checkpointType == "TRUNCATE":
  495. s.checkpointsCount = 0
  496. default:
  497. s.checkpointsCount++
  498. }
  499. s.updatePoints = 0
  500. }
  501. }