lowlevel.go 36 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "errors"
  12. "fmt"
  13. "hash/maphash"
  14. "os"
  15. "regexp"
  16. "time"
  17. "github.com/greatroar/blobloom"
  18. "github.com/syncthing/syncthing/lib/db/backend"
  19. "github.com/syncthing/syncthing/lib/events"
  20. "github.com/syncthing/syncthing/lib/fs"
  21. "github.com/syncthing/syncthing/lib/protocol"
  22. "github.com/syncthing/syncthing/lib/sha256"
  23. "github.com/syncthing/syncthing/lib/svcutil"
  24. "github.com/syncthing/syncthing/lib/sync"
  25. "github.com/syncthing/syncthing/lib/util"
  26. "github.com/thejerf/suture/v4"
  27. )
  28. const (
  29. // We set the bloom filter capacity to handle 100k individual items with
  30. // a false positive probability of 1% for the first pass. Once we know
  31. // how many items we have we will use that number instead, if it's more
  32. // than 100k. For fewer than 100k items we will just get better false
  33. // positive rate instead.
  34. indirectGCBloomCapacity = 100000
  35. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  36. indirectGCBloomMaxBytes = 32 << 20 // Use at most 32MiB memory, which covers our desired FP rate at 27 M items
  37. indirectGCDefaultInterval = 13 * time.Hour
  38. indirectGCTimeKey = "lastIndirectGCTime"
  39. // Use indirection for the block list when it exceeds this many entries
  40. blocksIndirectionCutoff = 3
  41. // Use indirection for the version vector when it exceeds this many entries
  42. versionIndirectionCutoff = 10
  43. recheckDefaultInterval = 30 * 24 * time.Hour
  44. needsRepairSuffix = ".needsrepair"
  45. )
  46. // Lowlevel is the lowest level database interface. It has a very simple
  47. // purpose: hold the actual backend database, and the in-memory state
  48. // that belong to that database. In the same way that a single on disk
  49. // database can only be opened once, there should be only one Lowlevel for
  50. // any given backend.
  51. type Lowlevel struct {
  52. *suture.Supervisor
  53. backend.Backend
  54. folderIdx *smallIndex
  55. deviceIdx *smallIndex
  56. keyer keyer
  57. gcMut sync.RWMutex
  58. gcKeyCount int
  59. indirectGCInterval time.Duration
  60. recheckInterval time.Duration
  61. oneFileSetCreated chan struct{}
  62. evLogger events.Logger
  63. blockFilter *bloomFilter
  64. versionFilter *bloomFilter
  65. }
  66. func NewLowlevel(backend backend.Backend, evLogger events.Logger, opts ...Option) (*Lowlevel, error) {
  67. // Only log restarts in debug mode.
  68. spec := svcutil.SpecWithDebugLogger(l)
  69. db := &Lowlevel{
  70. Supervisor: suture.New("db.Lowlevel", spec),
  71. Backend: backend,
  72. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  73. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  74. gcMut: sync.NewRWMutex(),
  75. indirectGCInterval: indirectGCDefaultInterval,
  76. recheckInterval: recheckDefaultInterval,
  77. oneFileSetCreated: make(chan struct{}),
  78. evLogger: evLogger,
  79. }
  80. for _, opt := range opts {
  81. opt(db)
  82. }
  83. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  84. db.Add(svcutil.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  85. if path := db.needsRepairPath(); path != "" {
  86. if _, err := os.Lstat(path); err == nil {
  87. l.Infoln("Database was marked for repair - this may take a while")
  88. if err := db.checkRepair(); err != nil {
  89. db.handleFailure(err)
  90. return nil, err
  91. }
  92. os.Remove(path)
  93. }
  94. }
  95. return db, nil
  96. }
  97. type Option func(*Lowlevel)
  98. // WithRecheckInterval sets the time interval in between metadata recalculations
  99. // and consistency checks.
  100. func WithRecheckInterval(dur time.Duration) Option {
  101. return func(db *Lowlevel) {
  102. if dur > 0 {
  103. db.recheckInterval = dur
  104. }
  105. }
  106. }
  107. // WithIndirectGCInterval sets the time interval in between GC runs.
  108. func WithIndirectGCInterval(dur time.Duration) Option {
  109. return func(db *Lowlevel) {
  110. if dur > 0 {
  111. db.indirectGCInterval = dur
  112. }
  113. }
  114. }
  115. // ListFolders returns the list of folders currently in the database
  116. func (db *Lowlevel) ListFolders() []string {
  117. return db.folderIdx.Values()
  118. }
  119. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  120. // global versionlist and metadata.
  121. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  122. db.gcMut.RLock()
  123. defer db.gcMut.RUnlock()
  124. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  125. if err != nil {
  126. return err
  127. }
  128. defer t.close()
  129. var dk, gk, keyBuf []byte
  130. devID, err := protocol.DeviceIDFromBytes(device)
  131. if err != nil {
  132. return err
  133. }
  134. for _, f := range fs {
  135. name := []byte(f.Name)
  136. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  137. if err != nil {
  138. return err
  139. }
  140. ef, ok, err := t.getFileTrunc(dk, true)
  141. if err != nil {
  142. return err
  143. }
  144. if ok && unchanged(f, ef) {
  145. l.Debugf("not inserting unchanged (remote); folder=%q device=%v %v", folder, devID, f)
  146. continue
  147. }
  148. if ok {
  149. meta.removeFile(devID, ef)
  150. }
  151. meta.addFile(devID, f)
  152. l.Debugf("insert (remote); folder=%q device=%v %v", folder, devID, f)
  153. if err := t.putFile(dk, f); err != nil {
  154. return err
  155. }
  156. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  157. if err != nil {
  158. return err
  159. }
  160. keyBuf, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  161. if err != nil {
  162. return err
  163. }
  164. if err := t.Checkpoint(); err != nil {
  165. return err
  166. }
  167. }
  168. return t.Commit()
  169. }
  170. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  171. // metadata, sequence and blockmap buckets.
  172. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  173. db.gcMut.RLock()
  174. defer db.gcMut.RUnlock()
  175. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  176. if err != nil {
  177. return err
  178. }
  179. defer t.close()
  180. var dk, gk, keyBuf []byte
  181. blockBuf := make([]byte, 4)
  182. for _, f := range fs {
  183. name := []byte(f.Name)
  184. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  185. if err != nil {
  186. return err
  187. }
  188. ef, ok, err := t.getFileByKey(dk)
  189. if err != nil {
  190. return err
  191. }
  192. if ok && unchanged(f, ef) {
  193. l.Debugf("not inserting unchanged (local); folder=%q %v", folder, f)
  194. continue
  195. }
  196. blocksHashSame := ok && bytes.Equal(ef.BlocksHash, f.BlocksHash)
  197. if ok {
  198. if len(ef.Blocks) != 0 && !ef.IsInvalid() && ef.Size > 0 {
  199. for _, block := range ef.Blocks {
  200. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  201. if err != nil {
  202. return err
  203. }
  204. if err := t.Delete(keyBuf); err != nil {
  205. return err
  206. }
  207. }
  208. if !blocksHashSame {
  209. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, ef.BlocksHash, name)
  210. if err != nil {
  211. return err
  212. }
  213. if err = t.Delete(keyBuf); err != nil {
  214. return err
  215. }
  216. }
  217. }
  218. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  219. if err != nil {
  220. return err
  221. }
  222. if err := t.Delete(keyBuf); err != nil {
  223. return err
  224. }
  225. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  226. }
  227. f.Sequence = meta.nextLocalSeq()
  228. if ok {
  229. meta.removeFile(protocol.LocalDeviceID, ef)
  230. }
  231. meta.addFile(protocol.LocalDeviceID, f)
  232. l.Debugf("insert (local); folder=%q %v", folder, f)
  233. if err := t.putFile(dk, f); err != nil {
  234. return err
  235. }
  236. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  237. if err != nil {
  238. return err
  239. }
  240. keyBuf, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  241. if err != nil {
  242. return err
  243. }
  244. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  245. if err != nil {
  246. return err
  247. }
  248. if err := t.Put(keyBuf, dk); err != nil {
  249. return err
  250. }
  251. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  252. if len(f.Blocks) != 0 && !f.IsInvalid() && f.Size > 0 {
  253. for i, block := range f.Blocks {
  254. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  255. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  256. if err != nil {
  257. return err
  258. }
  259. if err := t.Put(keyBuf, blockBuf); err != nil {
  260. return err
  261. }
  262. }
  263. if !blocksHashSame {
  264. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, f.BlocksHash, name)
  265. if err != nil {
  266. return err
  267. }
  268. if err = t.Put(keyBuf, nil); err != nil {
  269. return err
  270. }
  271. }
  272. }
  273. if err := t.Checkpoint(); err != nil {
  274. return err
  275. }
  276. }
  277. return t.Commit()
  278. }
  279. func (db *Lowlevel) dropFolder(folder []byte) error {
  280. db.gcMut.RLock()
  281. defer db.gcMut.RUnlock()
  282. t, err := db.newReadWriteTransaction()
  283. if err != nil {
  284. return err
  285. }
  286. defer t.close()
  287. // Remove all items related to the given folder from the device->file bucket
  288. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  289. if err != nil {
  290. return err
  291. }
  292. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  293. return err
  294. }
  295. // Remove all sequences related to the folder
  296. k1, err := db.keyer.GenerateSequenceKey(k0, folder, 0)
  297. if err != nil {
  298. return err
  299. }
  300. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  301. return err
  302. }
  303. // Remove all items related to the given folder from the global bucket
  304. k2, err := db.keyer.GenerateGlobalVersionKey(k1, folder, nil)
  305. if err != nil {
  306. return err
  307. }
  308. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  309. return err
  310. }
  311. // Remove all needs related to the folder
  312. k3, err := db.keyer.GenerateNeedFileKey(k2, folder, nil)
  313. if err != nil {
  314. return err
  315. }
  316. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  317. return err
  318. }
  319. // Remove the blockmap of the folder
  320. k4, err := db.keyer.GenerateBlockMapKey(k3, folder, nil, nil)
  321. if err != nil {
  322. return err
  323. }
  324. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  325. return err
  326. }
  327. k5, err := db.keyer.GenerateBlockListMapKey(k4, folder, nil, nil)
  328. if err != nil {
  329. return err
  330. }
  331. if err := t.deleteKeyPrefix(k5.WithoutHashAndName()); err != nil {
  332. return err
  333. }
  334. return t.Commit()
  335. }
  336. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  337. db.gcMut.RLock()
  338. defer db.gcMut.RUnlock()
  339. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  340. if err != nil {
  341. return err
  342. }
  343. defer t.close()
  344. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  345. if err != nil {
  346. return err
  347. }
  348. dbi, err := t.NewPrefixIterator(key)
  349. if err != nil {
  350. return err
  351. }
  352. defer dbi.Release()
  353. var gk, keyBuf []byte
  354. for dbi.Next() {
  355. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  356. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  357. if err != nil {
  358. return err
  359. }
  360. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  361. if err != nil {
  362. return err
  363. }
  364. if err := t.Delete(dbi.Key()); err != nil {
  365. return err
  366. }
  367. if err := t.Checkpoint(); err != nil {
  368. return err
  369. }
  370. }
  371. dbi.Release()
  372. if err := dbi.Error(); err != nil {
  373. return err
  374. }
  375. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  376. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  377. if err != nil {
  378. return err
  379. }
  380. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  381. return err
  382. }
  383. key2, err := db.keyer.GenerateBlockListMapKey(key, folder, nil, nil)
  384. if err != nil {
  385. return err
  386. }
  387. if err := t.deleteKeyPrefix(key2.WithoutHashAndName()); err != nil {
  388. return err
  389. }
  390. }
  391. return t.Commit()
  392. }
  393. func (db *Lowlevel) checkGlobals(folderStr string) (int, error) {
  394. t, err := db.newReadWriteTransaction()
  395. if err != nil {
  396. return 0, err
  397. }
  398. defer t.close()
  399. folder := []byte(folderStr)
  400. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  401. if err != nil {
  402. return 0, err
  403. }
  404. dbi, err := t.NewPrefixIterator(key.WithoutName())
  405. if err != nil {
  406. return 0, err
  407. }
  408. defer dbi.Release()
  409. fixed := 0
  410. var dk []byte
  411. ro := t.readOnlyTransaction
  412. for dbi.Next() {
  413. var vl VersionList
  414. if err := vl.Unmarshal(dbi.Value()); err != nil || vl.Empty() {
  415. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  416. return 0, err
  417. }
  418. continue
  419. }
  420. // Check the global version list for consistency. An issue in previous
  421. // versions of goleveldb could result in reordered writes so that
  422. // there are global entries pointing to no longer existing files. Here
  423. // we find those and clear them out.
  424. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  425. newVL := &VersionList{}
  426. var changed, changedHere bool
  427. for _, fv := range vl.RawVersions {
  428. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.Devices, newVL, ro)
  429. if err != nil {
  430. return 0, err
  431. }
  432. changed = changed || changedHere
  433. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.InvalidDevices, newVL, ro)
  434. if err != nil {
  435. return 0, err
  436. }
  437. changed = changed || changedHere
  438. }
  439. if newVL.Empty() {
  440. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  441. return 0, err
  442. }
  443. fixed++
  444. } else if changed {
  445. if err := t.Put(dbi.Key(), mustMarshal(newVL)); err != nil {
  446. return 0, err
  447. }
  448. fixed++
  449. }
  450. }
  451. dbi.Release()
  452. if err := dbi.Error(); err != nil {
  453. return 0, err
  454. }
  455. l.Debugf("global db check completed for %v", folder)
  456. return fixed, t.Commit()
  457. }
  458. func checkGlobalsFilterDevices(dk, folder, name []byte, devices [][]byte, vl *VersionList, t readOnlyTransaction) (bool, error) {
  459. var changed bool
  460. var err error
  461. for _, device := range devices {
  462. dk, err = t.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  463. if err != nil {
  464. return false, err
  465. }
  466. f, ok, err := t.getFileTrunc(dk, false)
  467. if err != nil {
  468. return false, err
  469. }
  470. if !ok {
  471. changed = true
  472. continue
  473. }
  474. _, _, _, _, _, _, err = vl.update(folder, device, f, t)
  475. if err != nil {
  476. return false, err
  477. }
  478. }
  479. return changed, nil
  480. }
  481. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  482. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  483. if err != nil {
  484. return 0, err
  485. }
  486. cur, err := db.Get(key)
  487. if backend.IsNotFound(err) {
  488. return 0, nil
  489. } else if err != nil {
  490. return 0, err
  491. }
  492. var id protocol.IndexID
  493. if err := id.Unmarshal(cur); err != nil {
  494. return 0, nil
  495. }
  496. return id, nil
  497. }
  498. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  499. bs, _ := id.Marshal() // marshalling can't fail
  500. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  501. if err != nil {
  502. return err
  503. }
  504. return db.Put(key, bs)
  505. }
  506. func (db *Lowlevel) dropFolderIndexIDs(folder []byte) error {
  507. t, err := db.newReadWriteTransaction()
  508. if err != nil {
  509. return err
  510. }
  511. defer t.close()
  512. if err := t.deleteKeyPrefixMatching([]byte{KeyTypeIndexID}, func(key []byte) bool {
  513. keyFolder, ok := t.keyer.FolderFromIndexIDKey(key)
  514. if !ok {
  515. l.Debugf("Deleting IndexID with missing FolderIdx: %v", key)
  516. return true
  517. }
  518. return bytes.Equal(keyFolder, folder)
  519. }); err != nil {
  520. return err
  521. }
  522. return t.Commit()
  523. }
  524. func (db *Lowlevel) dropIndexIDs() error {
  525. t, err := db.newReadWriteTransaction()
  526. if err != nil {
  527. return err
  528. }
  529. defer t.close()
  530. if err := t.deleteKeyPrefix([]byte{KeyTypeIndexID}); err != nil {
  531. return err
  532. }
  533. return t.Commit()
  534. }
  535. func (db *Lowlevel) dropMtimes(folder []byte) error {
  536. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  537. if err != nil {
  538. return err
  539. }
  540. return db.dropPrefix(key)
  541. }
  542. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  543. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  544. if err != nil {
  545. return err
  546. }
  547. return db.dropPrefix(key)
  548. }
  549. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  550. t, err := db.newReadWriteTransaction()
  551. if err != nil {
  552. return err
  553. }
  554. defer t.close()
  555. if err := t.deleteKeyPrefix(prefix); err != nil {
  556. return err
  557. }
  558. return t.Commit()
  559. }
  560. func (db *Lowlevel) gcRunner(ctx context.Context) error {
  561. // Calculate the time for the next GC run. Even if we should run GC
  562. // directly, give the system a while to get up and running and do other
  563. // stuff first. (We might have migrations and stuff which would be
  564. // better off running before GC.)
  565. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  566. if next < time.Minute {
  567. next = time.Minute
  568. }
  569. t := time.NewTimer(next)
  570. defer t.Stop()
  571. for {
  572. select {
  573. case <-ctx.Done():
  574. return ctx.Err()
  575. case <-t.C:
  576. if err := db.gcIndirect(ctx); err != nil {
  577. l.Warnln("Database indirection GC failed:", err)
  578. }
  579. db.recordTime(indirectGCTimeKey)
  580. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  581. }
  582. }
  583. }
  584. // recordTime records the current time under the given key, affecting the
  585. // next call to timeUntil with the same key.
  586. func (db *Lowlevel) recordTime(key string) {
  587. miscDB := NewMiscDataNamespace(db)
  588. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  589. }
  590. // timeUntil returns how long we should wait until the next interval, or
  591. // zero if it should happen directly.
  592. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  593. miscDB := NewMiscDataNamespace(db)
  594. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  595. nextTime := time.Unix(lastTime, 0).Add(every)
  596. sleepTime := time.Until(nextTime)
  597. if sleepTime < 0 {
  598. sleepTime = 0
  599. }
  600. return sleepTime
  601. }
  602. func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
  603. // The indirection GC uses bloom filters to track used block lists and
  604. // versions. This means iterating over all items, adding their hashes to
  605. // the filter, then iterating over the indirected items and removing
  606. // those that don't match the filter. The filter will give false
  607. // positives so we will keep around one percent of things that we don't
  608. // really need (at most).
  609. //
  610. // Indirection GC needs to run when there are no modifications to the
  611. // FileInfos or indirected items.
  612. l.Debugln("Starting database GC")
  613. // Create a new set of bloom filters, while holding the gcMut which
  614. // guarantees that no other modifications are happening concurrently.
  615. db.gcMut.Lock()
  616. capacity := indirectGCBloomCapacity
  617. if db.gcKeyCount > capacity {
  618. capacity = db.gcKeyCount
  619. }
  620. db.blockFilter = newBloomFilter(capacity)
  621. db.versionFilter = newBloomFilter(capacity)
  622. db.gcMut.Unlock()
  623. defer func() {
  624. // Forget the bloom filters on the way out.
  625. db.gcMut.Lock()
  626. db.blockFilter = nil
  627. db.versionFilter = nil
  628. db.gcMut.Unlock()
  629. }()
  630. var discardedBlocks, matchedBlocks, discardedVersions, matchedVersions int
  631. t, err := db.newReadWriteTransaction()
  632. if err != nil {
  633. return err
  634. }
  635. defer t.Release()
  636. // Set up the bloom filters with the initial capacity and false positive
  637. // rate, or higher capacity if we've done this before and seen lots of
  638. // items. For simplicity's sake we track just one count, which is the
  639. // highest of the various indirected items.
  640. // Iterate the FileInfos, unmarshal the block and version hashes and
  641. // add them to the filter.
  642. // This happens concurrently with normal database modifications, though
  643. // those modifications will now also add their blocks and versions to
  644. // the bloom filters.
  645. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  646. if err != nil {
  647. return err
  648. }
  649. defer it.Release()
  650. for it.Next() {
  651. select {
  652. case <-ctx.Done():
  653. return ctx.Err()
  654. default:
  655. }
  656. var hashes IndirectionHashesOnly
  657. if err := hashes.Unmarshal(it.Value()); err != nil {
  658. return err
  659. }
  660. db.recordIndirectionHashes(hashes)
  661. }
  662. it.Release()
  663. if err := it.Error(); err != nil {
  664. return err
  665. }
  666. // For the next phase we grab the GC lock again and hold it for the rest
  667. // of the method call. Now there can't be any further modifications to
  668. // the database or the bloom filters.
  669. db.gcMut.Lock()
  670. defer db.gcMut.Unlock()
  671. // Only print something if the process takes more than "a moment".
  672. logWait := make(chan struct{})
  673. logTimer := time.AfterFunc(10*time.Second, func() {
  674. l.Infoln("Database GC in progress - many Syncthing operations will be unresponsive until it's finished")
  675. close(logWait)
  676. })
  677. defer func() {
  678. if logTimer.Stop() {
  679. return
  680. }
  681. <-logWait // Make sure messages are sent in order.
  682. l.Infof("Database GC complete (discarded/remaining: %v/%v blocks, %v/%v versions)",
  683. discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
  684. }()
  685. // Iterate over block lists, removing keys with hashes that don't match
  686. // the filter.
  687. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  688. if err != nil {
  689. return err
  690. }
  691. defer it.Release()
  692. for it.Next() {
  693. select {
  694. case <-ctx.Done():
  695. return ctx.Err()
  696. default:
  697. }
  698. key := blockListKey(it.Key())
  699. if db.blockFilter.has(key.Hash()) {
  700. matchedBlocks++
  701. continue
  702. }
  703. if err := t.Delete(key); err != nil {
  704. return err
  705. }
  706. discardedBlocks++
  707. }
  708. it.Release()
  709. if err := it.Error(); err != nil {
  710. return err
  711. }
  712. // Iterate over version lists, removing keys with hashes that don't match
  713. // the filter.
  714. it, err = db.NewPrefixIterator([]byte{KeyTypeVersion})
  715. if err != nil {
  716. return err
  717. }
  718. for it.Next() {
  719. select {
  720. case <-ctx.Done():
  721. return ctx.Err()
  722. default:
  723. }
  724. key := versionKey(it.Key())
  725. if db.versionFilter.has(key.Hash()) {
  726. matchedVersions++
  727. continue
  728. }
  729. if err := t.Delete(key); err != nil {
  730. return err
  731. }
  732. discardedVersions++
  733. }
  734. it.Release()
  735. if err := it.Error(); err != nil {
  736. return err
  737. }
  738. // Remember the number of unique keys we kept until the next pass.
  739. db.gcKeyCount = matchedBlocks
  740. if matchedVersions > matchedBlocks {
  741. db.gcKeyCount = matchedVersions
  742. }
  743. if err := t.Commit(); err != nil {
  744. return err
  745. }
  746. l.Debugf("Finished GC (discarded/remaining: %v/%v blocks, %v/%v versions)", discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
  747. return nil
  748. }
  749. func (db *Lowlevel) recordIndirectionHashesForFile(f *protocol.FileInfo) {
  750. db.recordIndirectionHashes(IndirectionHashesOnly{BlocksHash: f.BlocksHash, VersionHash: f.VersionHash})
  751. }
  752. func (db *Lowlevel) recordIndirectionHashes(hs IndirectionHashesOnly) {
  753. // must be called with gcMut held (at least read-held)
  754. if db.blockFilter != nil && len(hs.BlocksHash) > 0 {
  755. db.blockFilter.add(hs.BlocksHash)
  756. }
  757. if db.versionFilter != nil && len(hs.VersionHash) > 0 {
  758. db.versionFilter.add(hs.VersionHash)
  759. }
  760. }
  761. func newBloomFilter(capacity int) *bloomFilter {
  762. return &bloomFilter{
  763. f: blobloom.NewSyncOptimized(blobloom.Config{
  764. Capacity: uint64(capacity),
  765. FPRate: indirectGCBloomFalsePositiveRate,
  766. MaxBits: 8 * indirectGCBloomMaxBytes,
  767. }),
  768. seed: maphash.MakeSeed(),
  769. }
  770. }
  771. type bloomFilter struct {
  772. f *blobloom.SyncFilter
  773. seed maphash.Seed
  774. }
  775. func (b *bloomFilter) add(id []byte) { b.f.Add(b.hash(id)) }
  776. func (b *bloomFilter) has(id []byte) bool { return b.f.Has(b.hash(id)) }
  777. // Hash function for the bloomfilter: maphash of the SHA-256.
  778. //
  779. // The randomization in maphash should ensure that we get different collisions
  780. // across runs, so colliding keys are not kept indefinitely.
  781. func (b *bloomFilter) hash(id []byte) uint64 {
  782. if len(id) != sha256.Size {
  783. panic("bug: bloomFilter.hash passed something not a SHA256 hash")
  784. }
  785. var h maphash.Hash
  786. h.SetSeed(b.seed)
  787. h.Write(id)
  788. return h.Sum64()
  789. }
  790. // checkRepair checks folder metadata and sequences for miscellaneous errors.
  791. func (db *Lowlevel) checkRepair() error {
  792. db.gcMut.RLock()
  793. defer db.gcMut.RUnlock()
  794. for _, folder := range db.ListFolders() {
  795. if _, err := db.getMetaAndCheckGCLocked(folder); err != nil {
  796. return err
  797. }
  798. }
  799. return nil
  800. }
  801. func (db *Lowlevel) getMetaAndCheck(folder string) (*metadataTracker, error) {
  802. db.gcMut.RLock()
  803. defer db.gcMut.RUnlock()
  804. return db.getMetaAndCheckGCLocked(folder)
  805. }
  806. func (db *Lowlevel) getMetaAndCheckGCLocked(folder string) (*metadataTracker, error) {
  807. fixed, err := db.checkLocalNeed([]byte(folder))
  808. if err != nil {
  809. return nil, fmt.Errorf("checking local need: %w", err)
  810. }
  811. if fixed != 0 {
  812. l.Infof("Repaired %d local need entries for folder %v in database", fixed, folder)
  813. }
  814. fixed, err = db.checkGlobals(folder)
  815. if err != nil {
  816. return nil, fmt.Errorf("checking globals: %w", err)
  817. }
  818. if fixed != 0 {
  819. l.Infof("Repaired %d global entries for folder %v in database", fixed, folder)
  820. }
  821. oldMeta := newMetadataTracker(db.keyer, db.evLogger)
  822. _ = oldMeta.fromDB(db, []byte(folder)) // Ignore error, it leads to index id reset too
  823. meta, err := db.recalcMeta(folder)
  824. if err != nil {
  825. return nil, fmt.Errorf("recalculating metadata: %w", err)
  826. }
  827. fixed, err = db.repairSequenceGCLocked(folder, meta)
  828. if err != nil {
  829. return nil, fmt.Errorf("repairing sequences: %w", err)
  830. }
  831. if fixed != 0 {
  832. l.Infof("Repaired %d sequence entries for folder %v in database", fixed, folder)
  833. meta, err = db.recalcMeta(folder)
  834. if err != nil {
  835. return nil, fmt.Errorf("recalculating metadata: %w", err)
  836. }
  837. }
  838. if err := db.checkSequencesUnchanged(folder, oldMeta, meta); err != nil {
  839. return nil, fmt.Errorf("checking for changed sequences: %w", err)
  840. }
  841. return meta, nil
  842. }
  843. func (db *Lowlevel) loadMetadataTracker(folder string) (*metadataTracker, error) {
  844. meta := newMetadataTracker(db.keyer, db.evLogger)
  845. if err := meta.fromDB(db, []byte(folder)); err != nil {
  846. if err == errMetaInconsistent {
  847. l.Infof("Stored folder metadata for %q is inconsistent; recalculating", folder)
  848. } else {
  849. l.Infof("No stored folder metadata for %q; recalculating", folder)
  850. }
  851. return db.getMetaAndCheck(folder)
  852. }
  853. curSeq := meta.Sequence(protocol.LocalDeviceID)
  854. if metaOK, err := db.verifyLocalSequence(curSeq, folder); err != nil {
  855. return nil, fmt.Errorf("verifying sequences: %w", err)
  856. } else if !metaOK {
  857. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  858. return db.getMetaAndCheck(folder)
  859. }
  860. if age := time.Since(meta.Created()); age > db.recheckInterval {
  861. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, util.NiceDurationString(age))
  862. return db.getMetaAndCheck(folder)
  863. }
  864. return meta, nil
  865. }
  866. func (db *Lowlevel) recalcMeta(folderStr string) (*metadataTracker, error) {
  867. folder := []byte(folderStr)
  868. meta := newMetadataTracker(db.keyer, db.evLogger)
  869. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  870. if err != nil {
  871. return nil, err
  872. }
  873. defer t.close()
  874. var deviceID protocol.DeviceID
  875. err = t.withAllFolderTruncated(folder, func(device []byte, f FileInfoTruncated) bool {
  876. copy(deviceID[:], device)
  877. meta.addFile(deviceID, f)
  878. return true
  879. })
  880. if err != nil {
  881. return nil, err
  882. }
  883. err = t.withGlobal(folder, nil, true, func(f protocol.FileIntf) bool {
  884. meta.addFile(protocol.GlobalDeviceID, f)
  885. return true
  886. })
  887. if err != nil {
  888. return nil, err
  889. }
  890. meta.emptyNeeded(protocol.LocalDeviceID)
  891. err = t.withNeed(folder, protocol.LocalDeviceID[:], true, func(f protocol.FileIntf) bool {
  892. meta.addNeeded(protocol.LocalDeviceID, f)
  893. return true
  894. })
  895. if err != nil {
  896. return nil, err
  897. }
  898. for _, device := range meta.devices() {
  899. meta.emptyNeeded(device)
  900. err = t.withNeed(folder, device[:], true, func(f protocol.FileIntf) bool {
  901. meta.addNeeded(device, f)
  902. return true
  903. })
  904. if err != nil {
  905. return nil, err
  906. }
  907. }
  908. meta.SetCreated()
  909. if err := t.Commit(); err != nil {
  910. return nil, err
  911. }
  912. return meta, nil
  913. }
  914. // Verify the local sequence number from actual sequence entries. Returns
  915. // true if it was all good, or false if a fixup was necessary.
  916. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) (bool, error) {
  917. // Walk the sequence index from the current (supposedly) highest
  918. // sequence number and raise the alarm if we get anything. This recovers
  919. // from the occasion where we have written sequence entries to disk but
  920. // not yet written new metadata to disk.
  921. //
  922. // Note that we can have the same thing happen for remote devices but
  923. // there it's not a problem -- we'll simply advertise a lower sequence
  924. // number than we've actually seen and receive some duplicate updates
  925. // and then be in sync again.
  926. t, err := db.newReadOnlyTransaction()
  927. if err != nil {
  928. return false, err
  929. }
  930. ok := true
  931. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi protocol.FileIntf) bool {
  932. ok = false // we got something, which we should not have
  933. return false
  934. }); err != nil {
  935. return false, err
  936. }
  937. t.close()
  938. return ok, nil
  939. }
  940. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  941. // match those in the corresponding file entries. It returns the amount of fixed
  942. // entries.
  943. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  944. t, err := db.newReadWriteTransaction(meta.CommitHook([]byte(folderStr)))
  945. if err != nil {
  946. return 0, err
  947. }
  948. defer t.close()
  949. fixed := 0
  950. folder := []byte(folderStr)
  951. // First check that every file entry has a matching sequence entry
  952. // (this was previously db schema upgrade to 9).
  953. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  954. if err != nil {
  955. return 0, err
  956. }
  957. it, err := t.NewPrefixIterator(dk.WithoutName())
  958. if err != nil {
  959. return 0, err
  960. }
  961. defer it.Release()
  962. var sk sequenceKey
  963. for it.Next() {
  964. intf, err := t.unmarshalTrunc(it.Value(), false)
  965. if err != nil {
  966. // Delete local items with invalid indirected blocks/versions.
  967. // They will be rescanned.
  968. var ierr *blocksIndirectionError
  969. if ok := errors.As(err, &ierr); ok && backend.IsNotFound(err) {
  970. intf, err = t.unmarshalTrunc(it.Value(), true)
  971. if err != nil {
  972. return 0, err
  973. }
  974. name := []byte(intf.FileName())
  975. gk, err := t.keyer.GenerateGlobalVersionKey(nil, folder, name)
  976. if err != nil {
  977. return 0, err
  978. }
  979. _, err = t.removeFromGlobal(gk, nil, folder, protocol.LocalDeviceID[:], name, nil)
  980. if err != nil {
  981. return 0, err
  982. }
  983. sk, err = db.keyer.GenerateSequenceKey(sk, folder, intf.SequenceNo())
  984. if err != nil {
  985. return 0, err
  986. }
  987. if err := t.Delete(sk); err != nil {
  988. return 0, err
  989. }
  990. if err := t.Delete(it.Key()); err != nil {
  991. return 0, err
  992. }
  993. }
  994. return 0, err
  995. }
  996. fi := intf.(protocol.FileInfo)
  997. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  998. return 0, err
  999. }
  1000. switch dk, err = t.Get(sk); {
  1001. case err != nil:
  1002. if !backend.IsNotFound(err) {
  1003. return 0, err
  1004. }
  1005. fallthrough
  1006. case !bytes.Equal(it.Key(), dk):
  1007. fixed++
  1008. fi.Sequence = meta.nextLocalSeq()
  1009. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  1010. return 0, err
  1011. }
  1012. if err := t.Put(sk, it.Key()); err != nil {
  1013. return 0, err
  1014. }
  1015. if err := t.putFile(it.Key(), fi); err != nil {
  1016. return 0, err
  1017. }
  1018. }
  1019. if err := t.Checkpoint(); err != nil {
  1020. return 0, err
  1021. }
  1022. }
  1023. if err := it.Error(); err != nil {
  1024. return 0, err
  1025. }
  1026. it.Release()
  1027. // Secondly check there's no sequence entries pointing at incorrect things.
  1028. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  1029. if err != nil {
  1030. return 0, err
  1031. }
  1032. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  1033. if err != nil {
  1034. return 0, err
  1035. }
  1036. defer it.Release()
  1037. for it.Next() {
  1038. // Check that the sequence from the key matches the
  1039. // sequence in the file.
  1040. fi, ok, err := t.getFileTrunc(it.Value(), true)
  1041. if err != nil {
  1042. return 0, err
  1043. }
  1044. if ok {
  1045. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  1046. continue
  1047. }
  1048. }
  1049. // Either the file is missing or has a different sequence number
  1050. fixed++
  1051. if err := t.Delete(it.Key()); err != nil {
  1052. return 0, err
  1053. }
  1054. }
  1055. if err := it.Error(); err != nil {
  1056. return 0, err
  1057. }
  1058. it.Release()
  1059. return fixed, t.Commit()
  1060. }
  1061. // Does not take care of metadata - if anything is repaired, the need count
  1062. // needs to be recalculated.
  1063. func (db *Lowlevel) checkLocalNeed(folder []byte) (int, error) {
  1064. repaired := 0
  1065. t, err := db.newReadWriteTransaction()
  1066. if err != nil {
  1067. return 0, err
  1068. }
  1069. defer t.close()
  1070. key, err := t.keyer.GenerateNeedFileKey(nil, folder, nil)
  1071. if err != nil {
  1072. return 0, err
  1073. }
  1074. dbi, err := t.NewPrefixIterator(key.WithoutName())
  1075. if err != nil {
  1076. return 0, err
  1077. }
  1078. defer dbi.Release()
  1079. var needName string
  1080. var needDone bool
  1081. next := func() {
  1082. needDone = !dbi.Next()
  1083. if !needDone {
  1084. needName = string(t.keyer.NameFromGlobalVersionKey(dbi.Key()))
  1085. }
  1086. }
  1087. next()
  1088. t.withNeedIteratingGlobal(folder, protocol.LocalDeviceID[:], true, func(fi protocol.FileIntf) bool {
  1089. f := fi.(FileInfoTruncated)
  1090. for !needDone && needName < f.Name {
  1091. repaired++
  1092. if err = t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  1093. return false
  1094. }
  1095. l.Debugln("check local need: removing", needName)
  1096. next()
  1097. }
  1098. if needName == f.Name {
  1099. next()
  1100. } else {
  1101. repaired++
  1102. key, err = t.keyer.GenerateNeedFileKey(key, folder, []byte(f.Name))
  1103. if err != nil {
  1104. return false
  1105. }
  1106. if err = t.Put(key, nil); err != nil {
  1107. return false
  1108. }
  1109. l.Debugln("check local need: adding", f.Name)
  1110. }
  1111. return true
  1112. })
  1113. if err != nil {
  1114. return 0, err
  1115. }
  1116. for !needDone {
  1117. repaired++
  1118. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  1119. return 0, err
  1120. }
  1121. l.Debugln("check local need: removing", needName)
  1122. next()
  1123. }
  1124. if err := dbi.Error(); err != nil {
  1125. return 0, err
  1126. }
  1127. dbi.Release()
  1128. if err = t.Commit(); err != nil {
  1129. return 0, err
  1130. }
  1131. return repaired, nil
  1132. }
  1133. // checkSequencesUnchanged resets delta indexes for any device where the
  1134. // sequence changed.
  1135. func (db *Lowlevel) checkSequencesUnchanged(folder string, oldMeta, meta *metadataTracker) error {
  1136. t, err := db.newReadWriteTransaction()
  1137. if err != nil {
  1138. return err
  1139. }
  1140. defer t.close()
  1141. var key []byte
  1142. deleteIndexID := func(devID protocol.DeviceID) error {
  1143. key, err = db.keyer.GenerateIndexIDKey(key, devID[:], []byte(folder))
  1144. if err != nil {
  1145. return err
  1146. }
  1147. return t.Delete(key)
  1148. }
  1149. if oldMeta.Sequence(protocol.LocalDeviceID) != meta.Sequence(protocol.LocalDeviceID) {
  1150. if err := deleteIndexID(protocol.LocalDeviceID); err != nil {
  1151. return err
  1152. }
  1153. l.Infof("Local sequence for folder %v changed while repairing - dropping delta indexes", folder)
  1154. }
  1155. oldDevices := oldMeta.devices()
  1156. oldSequences := make(map[protocol.DeviceID]int64, len(oldDevices))
  1157. for _, devID := range oldDevices {
  1158. oldSequences[devID] = oldMeta.Sequence(devID)
  1159. }
  1160. for _, devID := range meta.devices() {
  1161. oldSeq := oldSequences[devID]
  1162. delete(oldSequences, devID)
  1163. // A lower sequence number just means we will receive some indexes again.
  1164. if oldSeq >= meta.Sequence(devID) {
  1165. if oldSeq > meta.Sequence(devID) {
  1166. db.evLogger.Log(events.Failure, "lower remote sequence after recalculating metadata")
  1167. }
  1168. continue
  1169. }
  1170. db.evLogger.Log(events.Failure, "higher remote sequence after recalculating metadata")
  1171. if err := deleteIndexID(devID); err != nil {
  1172. return err
  1173. }
  1174. l.Infof("Sequence of device %v for folder %v changed while repairing - dropping delta indexes", devID.Short(), folder)
  1175. }
  1176. for devID := range oldSequences {
  1177. if err := deleteIndexID(devID); err != nil {
  1178. return err
  1179. }
  1180. l.Debugf("Removed indexID of device %v for folder %v which isn't present anymore", devID.Short(), folder)
  1181. }
  1182. return t.Commit()
  1183. }
  1184. func (db *Lowlevel) needsRepairPath() string {
  1185. path := db.Location()
  1186. if path == "" {
  1187. return ""
  1188. }
  1189. if path[len(path)-1] == fs.PathSeparator {
  1190. path = path[:len(path)-1]
  1191. }
  1192. return path + needsRepairSuffix
  1193. }
  1194. func (db *Lowlevel) checkErrorForRepair(err error) {
  1195. if errors.Is(err, errEntryFromGlobalMissing) || errors.Is(err, errEmptyGlobal) {
  1196. // Inconsistency error, mark db for repair on next start.
  1197. if path := db.needsRepairPath(); path != "" {
  1198. if fd, err := os.Create(path); err == nil {
  1199. fd.Close()
  1200. }
  1201. }
  1202. }
  1203. }
  1204. // unchanged checks if two files are the same and thus don't need to be updated.
  1205. // Local flags or the invalid bit might change without the version
  1206. // being bumped.
  1207. func unchanged(nf, ef protocol.FileIntf) bool {
  1208. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  1209. }
  1210. func (db *Lowlevel) handleFailure(err error) {
  1211. db.checkErrorForRepair(err)
  1212. if shouldReportFailure(err) {
  1213. db.evLogger.Log(events.Failure, err.Error())
  1214. }
  1215. }
  1216. var ldbPathRe = regexp.MustCompile(`(open|write|read) .+[\\/].+[\\/]index[^\\/]+[\\/][^\\/]+: `)
  1217. func shouldReportFailure(err error) bool {
  1218. return !ldbPathRe.MatchString(err.Error())
  1219. }