folderdb_update.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. // Copyright (C) 2025 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package sqlite
  7. import (
  8. "cmp"
  9. "context"
  10. "fmt"
  11. "log/slog"
  12. "slices"
  13. "github.com/jmoiron/sqlx"
  14. "github.com/syncthing/syncthing/internal/gen/dbproto"
  15. "github.com/syncthing/syncthing/internal/itererr"
  16. "github.com/syncthing/syncthing/internal/slogutil"
  17. "github.com/syncthing/syncthing/lib/osutil"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. "github.com/syncthing/syncthing/lib/sliceutil"
  20. "google.golang.org/protobuf/proto"
  21. )
  22. const (
  23. // Arbitrarily chosen values for checkpoint frequency....
  24. updatePointsPerFile = 100
  25. updatePointsPerBlock = 1
  26. updatePointsThreshold = 250_000
  27. )
  28. func (s *folderDB) Update(device protocol.DeviceID, fs []protocol.FileInfo) error {
  29. s.updateLock.Lock()
  30. defer s.updateLock.Unlock()
  31. deviceIdx, err := s.deviceIdxLocked(device)
  32. if err != nil {
  33. return wrap(err)
  34. }
  35. tx, err := s.sql.BeginTxx(context.Background(), nil)
  36. if err != nil {
  37. return wrap(err)
  38. }
  39. defer tx.Rollback() //nolint:errcheck
  40. txp := &txPreparedStmts{Tx: tx}
  41. //nolint:sqlclosecheck
  42. insertFileStmt, err := txp.Preparex(`
  43. INSERT OR REPLACE INTO files (device_idx, remote_sequence, name, type, modified, size, version, deleted, local_flags, blocklist_hash)
  44. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  45. RETURNING sequence
  46. `)
  47. if err != nil {
  48. return wrap(err, "prepare insert file")
  49. }
  50. //nolint:sqlclosecheck
  51. insertFileInfoStmt, err := txp.Preparex(`
  52. INSERT INTO fileinfos (sequence, fiprotobuf)
  53. VALUES (?, ?)
  54. `)
  55. if err != nil {
  56. return wrap(err, "prepare insert fileinfo")
  57. }
  58. //nolint:sqlclosecheck
  59. insertBlockListStmt, err := txp.Preparex(`
  60. INSERT OR IGNORE INTO blocklists (blocklist_hash, blprotobuf)
  61. VALUES (?, ?)
  62. `)
  63. if err != nil {
  64. return wrap(err, "prepare insert blocklist")
  65. }
  66. var prevRemoteSeq int64
  67. for i, f := range fs {
  68. f.Name = osutil.NormalizedFilename(f.Name)
  69. var blockshash *[]byte
  70. if len(f.Blocks) > 0 {
  71. f.BlocksHash = protocol.BlocksHash(f.Blocks)
  72. blockshash = &f.BlocksHash
  73. } else {
  74. f.BlocksHash = nil
  75. }
  76. if f.Type == protocol.FileInfoTypeDirectory {
  77. f.Size = 128 // synthetic directory size
  78. }
  79. // Insert the file.
  80. //
  81. // If it is a remote file, set remote_sequence otherwise leave it at
  82. // null. Returns the new local sequence.
  83. var remoteSeq *int64
  84. if device != protocol.LocalDeviceID {
  85. if i > 0 && f.Sequence == prevRemoteSeq {
  86. return fmt.Errorf("duplicate remote sequence number %d", prevRemoteSeq)
  87. }
  88. prevRemoteSeq = f.Sequence
  89. remoteSeq = &f.Sequence
  90. }
  91. var localSeq int64
  92. if err := insertFileStmt.Get(&localSeq, deviceIdx, remoteSeq, f.Name, f.Type, f.ModTime().UnixNano(), f.Size, f.Version.String(), f.IsDeleted(), f.LocalFlags, blockshash); err != nil {
  93. return wrap(err, "insert file")
  94. }
  95. if len(f.Blocks) > 0 {
  96. // Indirect the block list
  97. blocks := sliceutil.Map(f.Blocks, protocol.BlockInfo.ToWire)
  98. bs, err := proto.Marshal(&dbproto.BlockList{Blocks: blocks})
  99. if err != nil {
  100. return wrap(err, "marshal blocklist")
  101. }
  102. if _, err := insertBlockListStmt.Exec(f.BlocksHash, bs); err != nil {
  103. return wrap(err, "insert blocklist")
  104. }
  105. if device == protocol.LocalDeviceID {
  106. // Insert all blocks
  107. if err := s.insertBlocksLocked(txp, f.BlocksHash, f.Blocks); err != nil {
  108. return wrap(err, "insert blocks")
  109. }
  110. }
  111. f.Blocks = nil
  112. }
  113. // Insert the fileinfo
  114. if device == protocol.LocalDeviceID {
  115. f.Sequence = localSeq
  116. }
  117. bs, err := proto.Marshal(f.ToWire(true))
  118. if err != nil {
  119. return wrap(err, "marshal fileinfo")
  120. }
  121. if _, err := insertFileInfoStmt.Exec(localSeq, bs); err != nil {
  122. return wrap(err, "insert fileinfo")
  123. }
  124. // Update global and need
  125. if err := s.recalcGlobalForFile(txp, f.Name); err != nil {
  126. return wrap(err)
  127. }
  128. }
  129. if err := tx.Commit(); err != nil {
  130. return wrap(err)
  131. }
  132. s.periodicCheckpointLocked(fs)
  133. return nil
  134. }
  135. func (s *folderDB) DropDevice(device protocol.DeviceID) error {
  136. if device == protocol.LocalDeviceID {
  137. panic("bug: cannot drop local device")
  138. }
  139. s.updateLock.Lock()
  140. defer s.updateLock.Unlock()
  141. tx, err := s.sql.BeginTxx(context.Background(), nil)
  142. if err != nil {
  143. return wrap(err)
  144. }
  145. defer tx.Rollback() //nolint:errcheck
  146. txp := &txPreparedStmts{Tx: tx}
  147. // Drop the device, which cascades to delete all files etc for it
  148. if _, err := tx.Exec(`DELETE FROM devices WHERE device_id = ?`, device.String()); err != nil {
  149. return wrap(err)
  150. }
  151. // Recalc the globals for all affected folders
  152. if err := s.recalcGlobalForFolder(txp); err != nil {
  153. return wrap(err)
  154. }
  155. return wrap(tx.Commit())
  156. }
  157. func (s *folderDB) DropAllFiles(device protocol.DeviceID) error {
  158. s.updateLock.Lock()
  159. defer s.updateLock.Unlock()
  160. // This is a two part operation, first dropping all the files and then
  161. // recalculating the global state for the entire folder.
  162. deviceIdx, err := s.deviceIdxLocked(device)
  163. if err != nil {
  164. return wrap(err)
  165. }
  166. tx, err := s.sql.BeginTxx(context.Background(), nil)
  167. if err != nil {
  168. return wrap(err)
  169. }
  170. defer tx.Rollback() //nolint:errcheck
  171. txp := &txPreparedStmts{Tx: tx}
  172. // Drop all the file entries
  173. result, err := tx.Exec(`
  174. DELETE FROM files
  175. WHERE device_idx = ?
  176. `, deviceIdx)
  177. if err != nil {
  178. return wrap(err)
  179. }
  180. if n, err := result.RowsAffected(); err == nil && n == 0 {
  181. // The delete affected no rows, so we don't need to redo the entire
  182. // global/need calculation.
  183. return wrap(tx.Commit())
  184. }
  185. // Recalc global for the entire folder
  186. if err := s.recalcGlobalForFolder(txp); err != nil {
  187. return wrap(err)
  188. }
  189. return wrap(tx.Commit())
  190. }
  191. func (s *folderDB) DropFilesNamed(device protocol.DeviceID, names []string) error {
  192. for i := range names {
  193. names[i] = osutil.NormalizedFilename(names[i])
  194. }
  195. s.updateLock.Lock()
  196. defer s.updateLock.Unlock()
  197. deviceIdx, err := s.deviceIdxLocked(device)
  198. if err != nil {
  199. return wrap(err)
  200. }
  201. tx, err := s.sql.BeginTxx(context.Background(), nil)
  202. if err != nil {
  203. return wrap(err)
  204. }
  205. defer tx.Rollback() //nolint:errcheck
  206. txp := &txPreparedStmts{Tx: tx}
  207. // Drop the named files
  208. query, args, err := sqlx.In(`
  209. DELETE FROM files
  210. WHERE device_idx = ? AND name IN (?)
  211. `, deviceIdx, names)
  212. if err != nil {
  213. return wrap(err)
  214. }
  215. if _, err := tx.Exec(query, args...); err != nil {
  216. return wrap(err)
  217. }
  218. // Recalc globals for the named files
  219. for _, name := range names {
  220. if err := s.recalcGlobalForFile(txp, name); err != nil {
  221. return wrap(err)
  222. }
  223. }
  224. return wrap(tx.Commit())
  225. }
  226. func (*folderDB) insertBlocksLocked(tx *txPreparedStmts, blocklistHash []byte, blocks []protocol.BlockInfo) error {
  227. if len(blocks) == 0 {
  228. return nil
  229. }
  230. bs := make([]map[string]any, len(blocks))
  231. for i, b := range blocks {
  232. bs[i] = map[string]any{
  233. "hash": b.Hash,
  234. "blocklist_hash": blocklistHash,
  235. "idx": i,
  236. "offset": b.Offset,
  237. "size": b.Size,
  238. }
  239. }
  240. // Very large block lists (>8000 blocks) result in "too many variables"
  241. // error. Chunk it to a reasonable size.
  242. for chunk := range slices.Chunk(bs, 1000) {
  243. if _, err := tx.NamedExec(`
  244. INSERT OR IGNORE INTO blocks (hash, blocklist_hash, idx, offset, size)
  245. VALUES (:hash, :blocklist_hash, :idx, :offset, :size)
  246. `, chunk); err != nil {
  247. return wrap(err)
  248. }
  249. }
  250. return nil
  251. }
  252. func (s *folderDB) recalcGlobalForFolder(txp *txPreparedStmts) error {
  253. // Select files where there is no global, those are the ones we need to
  254. // recalculate.
  255. //nolint:sqlclosecheck
  256. namesStmt, err := txp.Preparex(`
  257. SELECT f.name FROM files f
  258. WHERE NOT EXISTS (
  259. SELECT 1 FROM files g
  260. WHERE g.name = f.name AND g.local_flags & ? != 0
  261. )
  262. GROUP BY name
  263. `)
  264. if err != nil {
  265. return wrap(err)
  266. }
  267. rows, err := namesStmt.Queryx(protocol.FlagLocalGlobal)
  268. if err != nil {
  269. return wrap(err)
  270. }
  271. defer rows.Close()
  272. for rows.Next() {
  273. var name string
  274. if err := rows.Scan(&name); err != nil {
  275. return wrap(err)
  276. }
  277. if err := s.recalcGlobalForFile(txp, name); err != nil {
  278. return wrap(err)
  279. }
  280. }
  281. return wrap(rows.Err())
  282. }
  283. func (s *folderDB) recalcGlobalForFile(txp *txPreparedStmts, file string) error {
  284. //nolint:sqlclosecheck
  285. selStmt, err := txp.Preparex(`
  286. SELECT name, device_idx, sequence, modified, version, deleted, local_flags FROM files
  287. WHERE name = ?
  288. `)
  289. if err != nil {
  290. return wrap(err)
  291. }
  292. es, err := itererr.Collect(iterStructs[fileRow](selStmt.Queryx(file)))
  293. if err != nil {
  294. return wrap(err)
  295. }
  296. if len(es) == 0 {
  297. // shouldn't happen
  298. return nil
  299. }
  300. // Sort the entries; the global entry is at the head of the list
  301. slices.SortFunc(es, fileRow.Compare)
  302. // The global version is the first one in the list that is not invalid,
  303. // or just the first one in the list if all are invalid.
  304. var global fileRow
  305. globIdx := slices.IndexFunc(es, func(e fileRow) bool { return !e.IsInvalid() })
  306. if globIdx < 0 {
  307. globIdx = 0
  308. }
  309. global = es[globIdx]
  310. // We "have" the file if the position in the list of versions is at the
  311. // global version or better, or if the version is the same as the global
  312. // file (we might be further down the list due to invalid flags), or if
  313. // the global is deleted and we don't have it at all...
  314. localIdx := slices.IndexFunc(es, func(e fileRow) bool { return e.DeviceIdx == s.localDeviceIdx })
  315. hasLocal := localIdx >= 0 && localIdx <= globIdx || // have a better or equal version
  316. localIdx >= 0 && es[localIdx].Version.Equal(global.Version.Vector) || // have an equal version but invalid/ignored
  317. localIdx < 0 && global.Deleted // missing it, but the global is also deleted
  318. // Set the global flag on the global entry. Set the need flag if the
  319. // local device needs this file, unless it's invalid.
  320. global.LocalFlags |= protocol.FlagLocalGlobal
  321. if hasLocal || global.IsInvalid() {
  322. global.LocalFlags &= ^protocol.FlagLocalNeeded
  323. } else {
  324. global.LocalFlags |= protocol.FlagLocalNeeded
  325. }
  326. //nolint:sqlclosecheck
  327. upStmt, err := txp.Preparex(`
  328. UPDATE files SET local_flags = ?
  329. WHERE device_idx = ? AND sequence = ?
  330. `)
  331. if err != nil {
  332. return wrap(err)
  333. }
  334. if _, err := upStmt.Exec(global.LocalFlags, global.DeviceIdx, global.Sequence); err != nil {
  335. return wrap(err)
  336. }
  337. // Clear the need and global flags on all other entries
  338. //nolint:sqlclosecheck
  339. upStmt, err = txp.Preparex(`
  340. UPDATE files SET local_flags = local_flags & ?
  341. WHERE name = ? AND sequence != ? AND local_flags & ? != 0
  342. `)
  343. if err != nil {
  344. return wrap(err)
  345. }
  346. if _, err := upStmt.Exec(^(protocol.FlagLocalNeeded | protocol.FlagLocalGlobal), global.Name, global.Sequence, protocol.FlagLocalNeeded|protocol.FlagLocalGlobal); err != nil {
  347. return wrap(err)
  348. }
  349. return nil
  350. }
  351. func (s *DB) folderIdxLocked(folderID string) (int64, error) {
  352. if _, err := s.stmt(`
  353. INSERT OR IGNORE INTO folders(folder_id)
  354. VALUES (?)
  355. `).Exec(folderID); err != nil {
  356. return 0, wrap(err)
  357. }
  358. var idx int64
  359. if err := s.stmt(`
  360. SELECT idx FROM folders
  361. WHERE folder_id = ?
  362. `).Get(&idx, folderID); err != nil {
  363. return 0, wrap(err)
  364. }
  365. return idx, nil
  366. }
  367. type fileRow struct {
  368. Name string
  369. Version dbVector
  370. DeviceIdx int64 `db:"device_idx"`
  371. Sequence int64
  372. Modified int64
  373. Size int64
  374. LocalFlags protocol.FlagLocal `db:"local_flags"`
  375. Deleted bool
  376. }
  377. func (e fileRow) Compare(other fileRow) int {
  378. // From FileInfo.WinsConflict
  379. vc := e.Version.Compare(other.Version.Vector)
  380. switch vc {
  381. case protocol.Equal:
  382. if e.IsInvalid() != other.IsInvalid() {
  383. if e.IsInvalid() {
  384. return 1
  385. }
  386. return -1
  387. }
  388. // Compare the device ID index, lower is better. This is only
  389. // deterministic to the extent that LocalDeviceID will always be the
  390. // lowest one, order between remote devices is random (and
  391. // irrelevant).
  392. return cmp.Compare(e.DeviceIdx, other.DeviceIdx)
  393. case protocol.Greater: // we are newer
  394. return -1
  395. case protocol.Lesser: // we are older
  396. return 1
  397. case protocol.ConcurrentGreater, protocol.ConcurrentLesser: // there is a conflict
  398. if e.IsInvalid() != other.IsInvalid() {
  399. if e.IsInvalid() { // we are invalid, we lose
  400. return 1
  401. }
  402. return -1 // they are invalid, we win
  403. }
  404. if d := cmp.Compare(e.Modified, other.Modified); d != 0 {
  405. return -d // positive d means we were newer, so we win (negative return)
  406. }
  407. if vc == protocol.ConcurrentGreater {
  408. return -1 // we have a better device ID, we win
  409. }
  410. return 1 // they win
  411. default:
  412. return 0
  413. }
  414. }
  415. func (e fileRow) IsInvalid() bool {
  416. return e.LocalFlags.IsInvalid()
  417. }
  418. func (s *folderDB) periodicCheckpointLocked(fs []protocol.FileInfo) {
  419. // Induce periodic checkpoints. We add points for each file and block,
  420. // and checkpoint when we've written more than a threshold of points.
  421. // This ensures we do not go too long without a checkpoint, while also
  422. // not doing it incessantly for every update.
  423. s.updatePoints += updatePointsPerFile * len(fs)
  424. for _, f := range fs {
  425. s.updatePoints += len(f.Blocks) * updatePointsPerBlock
  426. }
  427. if s.updatePoints > updatePointsThreshold {
  428. conn, err := s.sql.Conn(context.Background())
  429. if err != nil {
  430. slog.Debug("Connection error", slog.String("db", s.baseName), slogutil.Error(err))
  431. return
  432. }
  433. defer conn.Close()
  434. if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 8388608`); err != nil {
  435. slog.Debug("PRAGMA journal_size_limit error", slog.String("db", s.baseName), slogutil.Error(err))
  436. }
  437. // Every 50th checkpoint becomes a truncate, in an effort to bring
  438. // down the size now and then.
  439. checkpointType := "RESTART"
  440. if s.checkpointsCount > 50 {
  441. checkpointType = "TRUNCATE"
  442. }
  443. cmd := fmt.Sprintf(`PRAGMA wal_checkpoint(%s)`, checkpointType)
  444. row := conn.QueryRowContext(context.Background(), cmd)
  445. var res, modified, moved int
  446. if row.Err() != nil {
  447. slog.Debug("Command error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
  448. } else if err := row.Scan(&res, &modified, &moved); err != nil {
  449. slog.Debug("Command scan error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
  450. } else {
  451. slog.Debug("Checkpoint result", "db", s.baseName, "checkpointscount", s.checkpointsCount, "updatepoints", s.updatePoints, "res", res, "modified", modified, "moved", moved)
  452. }
  453. // Reset the truncate counter when a truncate succeeded. If it
  454. // failed, we'll keep trying it until we succeed. Increase it faster
  455. // when we fail to checkpoint, as it's more likely the WAL is
  456. // growing and will need truncation when we get out of this state.
  457. switch {
  458. case res == 1:
  459. s.checkpointsCount += 10
  460. case res == 0 && checkpointType == "TRUNCATE":
  461. s.checkpointsCount = 0
  462. default:
  463. s.checkpointsCount++
  464. }
  465. s.updatePoints = 0
  466. }
  467. }