lowlevel.go 36 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "errors"
  12. "fmt"
  13. "io"
  14. "os"
  15. "regexp"
  16. "time"
  17. "github.com/dchest/siphash"
  18. "github.com/greatroar/blobloom"
  19. "github.com/syncthing/syncthing/lib/db/backend"
  20. "github.com/syncthing/syncthing/lib/events"
  21. "github.com/syncthing/syncthing/lib/fs"
  22. "github.com/syncthing/syncthing/lib/protocol"
  23. "github.com/syncthing/syncthing/lib/rand"
  24. "github.com/syncthing/syncthing/lib/sha256"
  25. "github.com/syncthing/syncthing/lib/svcutil"
  26. "github.com/syncthing/syncthing/lib/sync"
  27. "github.com/syncthing/syncthing/lib/util"
  28. "github.com/thejerf/suture/v4"
  29. )
  30. const (
  31. // We set the bloom filter capacity to handle 100k individual items with
  32. // a false positive probability of 1% for the first pass. Once we know
  33. // how many items we have we will use that number instead, if it's more
  34. // than 100k. For fewer than 100k items we will just get better false
  35. // positive rate instead.
  36. indirectGCBloomCapacity = 100000
  37. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  38. indirectGCBloomMaxBytes = 32 << 20 // Use at most 32MiB memory, which covers our desired FP rate at 27 M items
  39. indirectGCDefaultInterval = 13 * time.Hour
  40. indirectGCTimeKey = "lastIndirectGCTime"
  41. // Use indirection for the block list when it exceeds this many entries
  42. blocksIndirectionCutoff = 3
  43. // Use indirection for the version vector when it exceeds this many entries
  44. versionIndirectionCutoff = 10
  45. recheckDefaultInterval = 30 * 24 * time.Hour
  46. needsRepairSuffix = ".needsrepair"
  47. )
  48. // Lowlevel is the lowest level database interface. It has a very simple
  49. // purpose: hold the actual backend database, and the in-memory state
  50. // that belong to that database. In the same way that a single on disk
  51. // database can only be opened once, there should be only one Lowlevel for
  52. // any given backend.
  53. type Lowlevel struct {
  54. *suture.Supervisor
  55. backend.Backend
  56. folderIdx *smallIndex
  57. deviceIdx *smallIndex
  58. keyer keyer
  59. gcMut sync.RWMutex
  60. gcKeyCount int
  61. indirectGCInterval time.Duration
  62. recheckInterval time.Duration
  63. oneFileSetCreated chan struct{}
  64. evLogger events.Logger
  65. blockFilter *bloomFilter
  66. versionFilter *bloomFilter
  67. }
  68. func NewLowlevel(backend backend.Backend, evLogger events.Logger, opts ...Option) (*Lowlevel, error) {
  69. // Only log restarts in debug mode.
  70. spec := svcutil.SpecWithDebugLogger(l)
  71. db := &Lowlevel{
  72. Supervisor: suture.New("db.Lowlevel", spec),
  73. Backend: backend,
  74. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  75. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  76. gcMut: sync.NewRWMutex(),
  77. indirectGCInterval: indirectGCDefaultInterval,
  78. recheckInterval: recheckDefaultInterval,
  79. oneFileSetCreated: make(chan struct{}),
  80. evLogger: evLogger,
  81. }
  82. for _, opt := range opts {
  83. opt(db)
  84. }
  85. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  86. db.Add(svcutil.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  87. if path := db.needsRepairPath(); path != "" {
  88. if _, err := os.Lstat(path); err == nil {
  89. l.Infoln("Database was marked for repair - this may take a while")
  90. if err := db.checkRepair(); err != nil {
  91. db.handleFailure(err)
  92. return nil, err
  93. }
  94. os.Remove(path)
  95. }
  96. }
  97. return db, nil
  98. }
  99. type Option func(*Lowlevel)
  100. // WithRecheckInterval sets the time interval in between metadata recalculations
  101. // and consistency checks.
  102. func WithRecheckInterval(dur time.Duration) Option {
  103. return func(db *Lowlevel) {
  104. if dur > 0 {
  105. db.recheckInterval = dur
  106. }
  107. }
  108. }
  109. // WithIndirectGCInterval sets the time interval in between GC runs.
  110. func WithIndirectGCInterval(dur time.Duration) Option {
  111. return func(db *Lowlevel) {
  112. if dur > 0 {
  113. db.indirectGCInterval = dur
  114. }
  115. }
  116. }
  117. // ListFolders returns the list of folders currently in the database
  118. func (db *Lowlevel) ListFolders() []string {
  119. return db.folderIdx.Values()
  120. }
  121. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  122. // global versionlist and metadata.
  123. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  124. db.gcMut.RLock()
  125. defer db.gcMut.RUnlock()
  126. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  127. if err != nil {
  128. return err
  129. }
  130. defer t.close()
  131. var dk, gk, keyBuf []byte
  132. devID, err := protocol.DeviceIDFromBytes(device)
  133. if err != nil {
  134. return err
  135. }
  136. for _, f := range fs {
  137. name := []byte(f.Name)
  138. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  139. if err != nil {
  140. return err
  141. }
  142. ef, ok, err := t.getFileTrunc(dk, true)
  143. if err != nil {
  144. return err
  145. }
  146. if ok && unchanged(f, ef) {
  147. l.Debugf("not inserting unchanged (remote); folder=%q device=%v %v", folder, devID, f)
  148. continue
  149. }
  150. if ok {
  151. meta.removeFile(devID, ef)
  152. }
  153. meta.addFile(devID, f)
  154. l.Debugf("insert (remote); folder=%q device=%v %v", folder, devID, f)
  155. if err := t.putFile(dk, f); err != nil {
  156. return err
  157. }
  158. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  159. if err != nil {
  160. return err
  161. }
  162. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  163. if err != nil {
  164. return err
  165. }
  166. if err := t.Checkpoint(); err != nil {
  167. return err
  168. }
  169. }
  170. return t.Commit()
  171. }
  172. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  173. // metadata, sequence and blockmap buckets.
  174. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  175. db.gcMut.RLock()
  176. defer db.gcMut.RUnlock()
  177. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  178. if err != nil {
  179. return err
  180. }
  181. defer t.close()
  182. var dk, gk, keyBuf []byte
  183. blockBuf := make([]byte, 4)
  184. for _, f := range fs {
  185. name := []byte(f.Name)
  186. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  187. if err != nil {
  188. return err
  189. }
  190. ef, ok, err := t.getFileByKey(dk)
  191. if err != nil {
  192. return err
  193. }
  194. if ok && unchanged(f, ef) {
  195. l.Debugf("not inserting unchanged (local); folder=%q %v", folder, f)
  196. continue
  197. }
  198. blocksHashSame := ok && bytes.Equal(ef.BlocksHash, f.BlocksHash)
  199. if ok {
  200. if len(ef.Blocks) != 0 && !ef.IsInvalid() && ef.Size > 0 {
  201. for _, block := range ef.Blocks {
  202. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  203. if err != nil {
  204. return err
  205. }
  206. if err := t.Delete(keyBuf); err != nil {
  207. return err
  208. }
  209. }
  210. if !blocksHashSame {
  211. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, ef.BlocksHash, name)
  212. if err != nil {
  213. return err
  214. }
  215. if err = t.Delete(keyBuf); err != nil {
  216. return err
  217. }
  218. }
  219. }
  220. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  221. if err != nil {
  222. return err
  223. }
  224. if err := t.Delete(keyBuf); err != nil {
  225. return err
  226. }
  227. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  228. }
  229. f.Sequence = meta.nextLocalSeq()
  230. if ok {
  231. meta.removeFile(protocol.LocalDeviceID, ef)
  232. }
  233. meta.addFile(protocol.LocalDeviceID, f)
  234. l.Debugf("insert (local); folder=%q %v", folder, f)
  235. if err := t.putFile(dk, f); err != nil {
  236. return err
  237. }
  238. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  239. if err != nil {
  240. return err
  241. }
  242. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  243. if err != nil {
  244. return err
  245. }
  246. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  247. if err != nil {
  248. return err
  249. }
  250. if err := t.Put(keyBuf, dk); err != nil {
  251. return err
  252. }
  253. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  254. if len(f.Blocks) != 0 && !f.IsInvalid() && f.Size > 0 {
  255. for i, block := range f.Blocks {
  256. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  257. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  258. if err != nil {
  259. return err
  260. }
  261. if err := t.Put(keyBuf, blockBuf); err != nil {
  262. return err
  263. }
  264. }
  265. if !blocksHashSame {
  266. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, f.BlocksHash, name)
  267. if err != nil {
  268. return err
  269. }
  270. if err = t.Put(keyBuf, nil); err != nil {
  271. return err
  272. }
  273. }
  274. }
  275. if err := t.Checkpoint(); err != nil {
  276. return err
  277. }
  278. }
  279. return t.Commit()
  280. }
  281. func (db *Lowlevel) dropFolder(folder []byte) error {
  282. db.gcMut.RLock()
  283. defer db.gcMut.RUnlock()
  284. t, err := db.newReadWriteTransaction()
  285. if err != nil {
  286. return err
  287. }
  288. defer t.close()
  289. // Remove all items related to the given folder from the device->file bucket
  290. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  291. if err != nil {
  292. return err
  293. }
  294. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  295. return err
  296. }
  297. // Remove all sequences related to the folder
  298. k1, err := db.keyer.GenerateSequenceKey(k0, folder, 0)
  299. if err != nil {
  300. return err
  301. }
  302. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  303. return err
  304. }
  305. // Remove all items related to the given folder from the global bucket
  306. k2, err := db.keyer.GenerateGlobalVersionKey(k1, folder, nil)
  307. if err != nil {
  308. return err
  309. }
  310. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  311. return err
  312. }
  313. // Remove all needs related to the folder
  314. k3, err := db.keyer.GenerateNeedFileKey(k2, folder, nil)
  315. if err != nil {
  316. return err
  317. }
  318. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  319. return err
  320. }
  321. // Remove the blockmap of the folder
  322. k4, err := db.keyer.GenerateBlockMapKey(k3, folder, nil, nil)
  323. if err != nil {
  324. return err
  325. }
  326. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  327. return err
  328. }
  329. k5, err := db.keyer.GenerateBlockListMapKey(k4, folder, nil, nil)
  330. if err != nil {
  331. return err
  332. }
  333. if err := t.deleteKeyPrefix(k5.WithoutHashAndName()); err != nil {
  334. return err
  335. }
  336. return t.Commit()
  337. }
  338. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  339. db.gcMut.RLock()
  340. defer db.gcMut.RUnlock()
  341. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  342. if err != nil {
  343. return err
  344. }
  345. defer t.close()
  346. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  347. if err != nil {
  348. return err
  349. }
  350. dbi, err := t.NewPrefixIterator(key)
  351. if err != nil {
  352. return err
  353. }
  354. defer dbi.Release()
  355. var gk, keyBuf []byte
  356. for dbi.Next() {
  357. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  358. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  359. if err != nil {
  360. return err
  361. }
  362. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  363. if err != nil {
  364. return err
  365. }
  366. if err := t.Delete(dbi.Key()); err != nil {
  367. return err
  368. }
  369. if err := t.Checkpoint(); err != nil {
  370. return err
  371. }
  372. }
  373. dbi.Release()
  374. if err := dbi.Error(); err != nil {
  375. return err
  376. }
  377. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  378. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  379. if err != nil {
  380. return err
  381. }
  382. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  383. return err
  384. }
  385. key2, err := db.keyer.GenerateBlockListMapKey(key, folder, nil, nil)
  386. if err != nil {
  387. return err
  388. }
  389. if err := t.deleteKeyPrefix(key2.WithoutHashAndName()); err != nil {
  390. return err
  391. }
  392. }
  393. return t.Commit()
  394. }
  395. func (db *Lowlevel) checkGlobals(folderStr string) (int, error) {
  396. t, err := db.newReadWriteTransaction()
  397. if err != nil {
  398. return 0, err
  399. }
  400. defer t.close()
  401. folder := []byte(folderStr)
  402. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  403. if err != nil {
  404. return 0, err
  405. }
  406. dbi, err := t.NewPrefixIterator(key.WithoutName())
  407. if err != nil {
  408. return 0, err
  409. }
  410. defer dbi.Release()
  411. fixed := 0
  412. var dk []byte
  413. ro := t.readOnlyTransaction
  414. for dbi.Next() {
  415. var vl VersionList
  416. if err := vl.Unmarshal(dbi.Value()); err != nil || vl.Empty() {
  417. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  418. return 0, err
  419. }
  420. continue
  421. }
  422. // Check the global version list for consistency. An issue in previous
  423. // versions of goleveldb could result in reordered writes so that
  424. // there are global entries pointing to no longer existing files. Here
  425. // we find those and clear them out.
  426. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  427. newVL := &VersionList{}
  428. var changed, changedHere bool
  429. for _, fv := range vl.RawVersions {
  430. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.Devices, newVL, ro)
  431. if err != nil {
  432. return 0, err
  433. }
  434. changed = changed || changedHere
  435. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.InvalidDevices, newVL, ro)
  436. if err != nil {
  437. return 0, err
  438. }
  439. changed = changed || changedHere
  440. }
  441. if newVL.Empty() {
  442. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  443. return 0, err
  444. }
  445. fixed++
  446. } else if changed {
  447. if err := t.Put(dbi.Key(), mustMarshal(newVL)); err != nil {
  448. return 0, err
  449. }
  450. fixed++
  451. }
  452. }
  453. dbi.Release()
  454. if err := dbi.Error(); err != nil {
  455. return 0, err
  456. }
  457. l.Debugf("global db check completed for %v", folder)
  458. return fixed, t.Commit()
  459. }
  460. func checkGlobalsFilterDevices(dk, folder, name []byte, devices [][]byte, vl *VersionList, t readOnlyTransaction) (bool, error) {
  461. var changed bool
  462. var err error
  463. for _, device := range devices {
  464. dk, err = t.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  465. if err != nil {
  466. return false, err
  467. }
  468. f, ok, err := t.getFileTrunc(dk, false)
  469. if err != nil {
  470. return false, err
  471. }
  472. if !ok {
  473. changed = true
  474. continue
  475. }
  476. _, _, _, _, _, _, err = vl.update(folder, device, f, t)
  477. if err != nil {
  478. return false, err
  479. }
  480. }
  481. return changed, nil
  482. }
  483. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  484. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  485. if err != nil {
  486. return 0, err
  487. }
  488. cur, err := db.Get(key)
  489. if backend.IsNotFound(err) {
  490. return 0, nil
  491. } else if err != nil {
  492. return 0, err
  493. }
  494. var id protocol.IndexID
  495. if err := id.Unmarshal(cur); err != nil {
  496. return 0, nil
  497. }
  498. return id, nil
  499. }
  500. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  501. bs, _ := id.Marshal() // marshalling can't fail
  502. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  503. if err != nil {
  504. return err
  505. }
  506. return db.Put(key, bs)
  507. }
  508. func (db *Lowlevel) dropFolderIndexIDs(folder []byte) error {
  509. t, err := db.newReadWriteTransaction()
  510. if err != nil {
  511. return err
  512. }
  513. defer t.close()
  514. if err := t.deleteKeyPrefixMatching([]byte{KeyTypeIndexID}, func(key []byte) bool {
  515. keyFolder, ok := t.keyer.FolderFromIndexIDKey(key)
  516. if !ok {
  517. l.Debugf("Deleting IndexID with missing FolderIdx: %v", key)
  518. return true
  519. }
  520. return bytes.Equal(keyFolder, folder)
  521. }); err != nil {
  522. return err
  523. }
  524. return t.Commit()
  525. }
  526. func (db *Lowlevel) dropIndexIDs() error {
  527. t, err := db.newReadWriteTransaction()
  528. if err != nil {
  529. return err
  530. }
  531. defer t.close()
  532. if err := t.deleteKeyPrefix([]byte{KeyTypeIndexID}); err != nil {
  533. return err
  534. }
  535. return t.Commit()
  536. }
  537. func (db *Lowlevel) dropMtimes(folder []byte) error {
  538. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  539. if err != nil {
  540. return err
  541. }
  542. return db.dropPrefix(key)
  543. }
  544. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  545. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  546. if err != nil {
  547. return err
  548. }
  549. return db.dropPrefix(key)
  550. }
  551. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  552. t, err := db.newReadWriteTransaction()
  553. if err != nil {
  554. return err
  555. }
  556. defer t.close()
  557. if err := t.deleteKeyPrefix(prefix); err != nil {
  558. return err
  559. }
  560. return t.Commit()
  561. }
  562. func (db *Lowlevel) gcRunner(ctx context.Context) error {
  563. // Calculate the time for the next GC run. Even if we should run GC
  564. // directly, give the system a while to get up and running and do other
  565. // stuff first. (We might have migrations and stuff which would be
  566. // better off running before GC.)
  567. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  568. if next < time.Minute {
  569. next = time.Minute
  570. }
  571. t := time.NewTimer(next)
  572. defer t.Stop()
  573. for {
  574. select {
  575. case <-ctx.Done():
  576. return ctx.Err()
  577. case <-t.C:
  578. if err := db.gcIndirect(ctx); err != nil {
  579. l.Warnln("Database indirection GC failed:", err)
  580. }
  581. db.recordTime(indirectGCTimeKey)
  582. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  583. }
  584. }
  585. }
  586. // recordTime records the current time under the given key, affecting the
  587. // next call to timeUntil with the same key.
  588. func (db *Lowlevel) recordTime(key string) {
  589. miscDB := NewMiscDataNamespace(db)
  590. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  591. }
  592. // timeUntil returns how long we should wait until the next interval, or
  593. // zero if it should happen directly.
  594. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  595. miscDB := NewMiscDataNamespace(db)
  596. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  597. nextTime := time.Unix(lastTime, 0).Add(every)
  598. sleepTime := time.Until(nextTime)
  599. if sleepTime < 0 {
  600. sleepTime = 0
  601. }
  602. return sleepTime
  603. }
  604. func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
  605. // The indirection GC uses bloom filters to track used block lists and
  606. // versions. This means iterating over all items, adding their hashes to
  607. // the filter, then iterating over the indirected items and removing
  608. // those that don't match the filter. The filter will give false
  609. // positives so we will keep around one percent of things that we don't
  610. // really need (at most).
  611. //
  612. // Indirection GC needs to run when there are no modifications to the
  613. // FileInfos or indirected items.
  614. l.Debugln("Starting database GC")
  615. // Create a new set of bloom filters, while holding the gcMut which
  616. // guarantees that no other modifications are happening concurrently.
  617. db.gcMut.Lock()
  618. capacity := indirectGCBloomCapacity
  619. if db.gcKeyCount > capacity {
  620. capacity = db.gcKeyCount
  621. }
  622. db.blockFilter = newBloomFilter(capacity)
  623. db.versionFilter = newBloomFilter(capacity)
  624. db.gcMut.Unlock()
  625. defer func() {
  626. // Forget the bloom filters on the way out.
  627. db.gcMut.Lock()
  628. db.blockFilter = nil
  629. db.versionFilter = nil
  630. db.gcMut.Unlock()
  631. }()
  632. var discardedBlocks, matchedBlocks, discardedVersions, matchedVersions int
  633. t, err := db.newReadWriteTransaction()
  634. if err != nil {
  635. return err
  636. }
  637. defer t.Release()
  638. // Set up the bloom filters with the initial capacity and false positive
  639. // rate, or higher capacity if we've done this before and seen lots of
  640. // items. For simplicity's sake we track just one count, which is the
  641. // highest of the various indirected items.
  642. // Iterate the FileInfos, unmarshal the block and version hashes and
  643. // add them to the filter.
  644. // This happens concurrently with normal database modifications, though
  645. // those modifications will now also add their blocks and versions to
  646. // the bloom filters.
  647. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  648. if err != nil {
  649. return err
  650. }
  651. defer it.Release()
  652. for it.Next() {
  653. select {
  654. case <-ctx.Done():
  655. return ctx.Err()
  656. default:
  657. }
  658. var hashes IndirectionHashesOnly
  659. if err := hashes.Unmarshal(it.Value()); err != nil {
  660. return err
  661. }
  662. db.recordIndirectionHashes(hashes)
  663. }
  664. it.Release()
  665. if err := it.Error(); err != nil {
  666. return err
  667. }
  668. // For the next phase we grab the GC lock again and hold it for the rest
  669. // of the method call. Now there can't be any further modifications to
  670. // the database or the bloom filters.
  671. db.gcMut.Lock()
  672. defer db.gcMut.Unlock()
  673. // Only print something if the process takes more than "a moment".
  674. logWait := make(chan struct{})
  675. logTimer := time.AfterFunc(10*time.Second, func() {
  676. l.Infoln("Database GC in progress - many Syncthing operations will be unresponsive until it's finished")
  677. close(logWait)
  678. })
  679. defer func() {
  680. if logTimer.Stop() {
  681. return
  682. }
  683. <-logWait // Make sure messages are sent in order.
  684. l.Infof("Database GC complete (discarded/remaining: %v/%v blocks, %v/%v versions)",
  685. discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
  686. }()
  687. // Iterate over block lists, removing keys with hashes that don't match
  688. // the filter.
  689. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  690. if err != nil {
  691. return err
  692. }
  693. defer it.Release()
  694. for it.Next() {
  695. select {
  696. case <-ctx.Done():
  697. return ctx.Err()
  698. default:
  699. }
  700. key := blockListKey(it.Key())
  701. if db.blockFilter.has(key.Hash()) {
  702. matchedBlocks++
  703. continue
  704. }
  705. if err := t.Delete(key); err != nil {
  706. return err
  707. }
  708. discardedBlocks++
  709. }
  710. it.Release()
  711. if err := it.Error(); err != nil {
  712. return err
  713. }
  714. // Iterate over version lists, removing keys with hashes that don't match
  715. // the filter.
  716. it, err = db.NewPrefixIterator([]byte{KeyTypeVersion})
  717. if err != nil {
  718. return err
  719. }
  720. for it.Next() {
  721. select {
  722. case <-ctx.Done():
  723. return ctx.Err()
  724. default:
  725. }
  726. key := versionKey(it.Key())
  727. if db.versionFilter.has(key.Hash()) {
  728. matchedVersions++
  729. continue
  730. }
  731. if err := t.Delete(key); err != nil {
  732. return err
  733. }
  734. discardedVersions++
  735. }
  736. it.Release()
  737. if err := it.Error(); err != nil {
  738. return err
  739. }
  740. // Remember the number of unique keys we kept until the next pass.
  741. db.gcKeyCount = matchedBlocks
  742. if matchedVersions > matchedBlocks {
  743. db.gcKeyCount = matchedVersions
  744. }
  745. if err := t.Commit(); err != nil {
  746. return err
  747. }
  748. l.Debugf("Finished GC (discarded/remaining: %v/%v blocks, %v/%v versions)", discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
  749. return nil
  750. }
  751. func (db *Lowlevel) recordIndirectionHashesForFile(f *protocol.FileInfo) {
  752. db.recordIndirectionHashes(IndirectionHashesOnly{BlocksHash: f.BlocksHash, VersionHash: f.VersionHash})
  753. }
  754. func (db *Lowlevel) recordIndirectionHashes(hs IndirectionHashesOnly) {
  755. // must be called with gcMut held (at least read-held)
  756. if db.blockFilter != nil && len(hs.BlocksHash) > 0 {
  757. db.blockFilter.add(hs.BlocksHash)
  758. }
  759. if db.versionFilter != nil && len(hs.VersionHash) > 0 {
  760. db.versionFilter.add(hs.VersionHash)
  761. }
  762. }
  763. func newBloomFilter(capacity int) *bloomFilter {
  764. var buf [16]byte
  765. io.ReadFull(rand.Reader, buf[:])
  766. return &bloomFilter{
  767. f: blobloom.NewSyncOptimized(blobloom.Config{
  768. Capacity: uint64(capacity),
  769. FPRate: indirectGCBloomFalsePositiveRate,
  770. MaxBits: 8 * indirectGCBloomMaxBytes,
  771. }),
  772. k0: binary.LittleEndian.Uint64(buf[:8]),
  773. k1: binary.LittleEndian.Uint64(buf[8:]),
  774. }
  775. }
  776. type bloomFilter struct {
  777. f *blobloom.SyncFilter
  778. k0, k1 uint64 // Random key for SipHash.
  779. }
  780. func (b *bloomFilter) add(id []byte) { b.f.Add(b.hash(id)) }
  781. func (b *bloomFilter) has(id []byte) bool { return b.f.Has(b.hash(id)) }
  782. // Hash function for the bloomfilter: SipHash of the SHA-256.
  783. //
  784. // The randomization in SipHash means we get different collisions across
  785. // runs and colliding keys are not kept indefinitely.
  786. func (b *bloomFilter) hash(id []byte) uint64 {
  787. if len(id) != sha256.Size {
  788. panic("bug: bloomFilter.hash passed something not a SHA256 hash")
  789. }
  790. return siphash.Hash(b.k0, b.k1, id)
  791. }
  792. // checkRepair checks folder metadata and sequences for miscellaneous errors.
  793. func (db *Lowlevel) checkRepair() error {
  794. db.gcMut.RLock()
  795. defer db.gcMut.RUnlock()
  796. for _, folder := range db.ListFolders() {
  797. if _, err := db.getMetaAndCheckGCLocked(folder); err != nil {
  798. return err
  799. }
  800. }
  801. return nil
  802. }
  803. func (db *Lowlevel) getMetaAndCheck(folder string) (*metadataTracker, error) {
  804. db.gcMut.RLock()
  805. defer db.gcMut.RUnlock()
  806. return db.getMetaAndCheckGCLocked(folder)
  807. }
  808. func (db *Lowlevel) getMetaAndCheckGCLocked(folder string) (*metadataTracker, error) {
  809. fixed, err := db.checkLocalNeed([]byte(folder))
  810. if err != nil {
  811. return nil, fmt.Errorf("checking local need: %w", err)
  812. }
  813. if fixed != 0 {
  814. l.Infof("Repaired %d local need entries for folder %v in database", fixed, folder)
  815. }
  816. fixed, err = db.checkGlobals(folder)
  817. if err != nil {
  818. return nil, fmt.Errorf("checking globals: %w", err)
  819. }
  820. if fixed != 0 {
  821. l.Infof("Repaired %d global entries for folder %v in database", fixed, folder)
  822. }
  823. oldMeta := newMetadataTracker(db.keyer, db.evLogger)
  824. _ = oldMeta.fromDB(db, []byte(folder)) // Ignore error, it leads to index id reset too
  825. meta, err := db.recalcMeta(folder)
  826. if err != nil {
  827. return nil, fmt.Errorf("recalculating metadata: %w", err)
  828. }
  829. fixed, err = db.repairSequenceGCLocked(folder, meta)
  830. if err != nil {
  831. return nil, fmt.Errorf("repairing sequences: %w", err)
  832. }
  833. if fixed != 0 {
  834. l.Infof("Repaired %d sequence entries for folder %v in database", fixed, folder)
  835. meta, err = db.recalcMeta(folder)
  836. if err != nil {
  837. return nil, fmt.Errorf("recalculating metadata: %w", err)
  838. }
  839. }
  840. if err := db.checkSequencesUnchanged(folder, oldMeta, meta); err != nil {
  841. return nil, fmt.Errorf("checking for changed sequences: %w", err)
  842. }
  843. return meta, nil
  844. }
  845. func (db *Lowlevel) loadMetadataTracker(folder string) (*metadataTracker, error) {
  846. meta := newMetadataTracker(db.keyer, db.evLogger)
  847. if err := meta.fromDB(db, []byte(folder)); err != nil {
  848. if err == errMetaInconsistent {
  849. l.Infof("Stored folder metadata for %q is inconsistent; recalculating", folder)
  850. } else {
  851. l.Infof("No stored folder metadata for %q; recalculating", folder)
  852. }
  853. return db.getMetaAndCheck(folder)
  854. }
  855. curSeq := meta.Sequence(protocol.LocalDeviceID)
  856. if metaOK, err := db.verifyLocalSequence(curSeq, folder); err != nil {
  857. return nil, fmt.Errorf("verifying sequences: %w", err)
  858. } else if !metaOK {
  859. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  860. return db.getMetaAndCheck(folder)
  861. }
  862. if age := time.Since(meta.Created()); age > db.recheckInterval {
  863. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, util.NiceDurationString(age))
  864. return db.getMetaAndCheck(folder)
  865. }
  866. return meta, nil
  867. }
  868. func (db *Lowlevel) recalcMeta(folderStr string) (*metadataTracker, error) {
  869. folder := []byte(folderStr)
  870. meta := newMetadataTracker(db.keyer, db.evLogger)
  871. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  872. if err != nil {
  873. return nil, err
  874. }
  875. defer t.close()
  876. var deviceID protocol.DeviceID
  877. err = t.withAllFolderTruncated(folder, func(device []byte, f FileInfoTruncated) bool {
  878. copy(deviceID[:], device)
  879. meta.addFile(deviceID, f)
  880. return true
  881. })
  882. if err != nil {
  883. return nil, err
  884. }
  885. err = t.withGlobal(folder, nil, true, func(f protocol.FileIntf) bool {
  886. meta.addFile(protocol.GlobalDeviceID, f)
  887. return true
  888. })
  889. if err != nil {
  890. return nil, err
  891. }
  892. meta.emptyNeeded(protocol.LocalDeviceID)
  893. err = t.withNeed(folder, protocol.LocalDeviceID[:], true, func(f protocol.FileIntf) bool {
  894. meta.addNeeded(protocol.LocalDeviceID, f)
  895. return true
  896. })
  897. if err != nil {
  898. return nil, err
  899. }
  900. for _, device := range meta.devices() {
  901. meta.emptyNeeded(device)
  902. err = t.withNeed(folder, device[:], true, func(f protocol.FileIntf) bool {
  903. meta.addNeeded(device, f)
  904. return true
  905. })
  906. if err != nil {
  907. return nil, err
  908. }
  909. }
  910. meta.SetCreated()
  911. if err := t.Commit(); err != nil {
  912. return nil, err
  913. }
  914. return meta, nil
  915. }
  916. // Verify the local sequence number from actual sequence entries. Returns
  917. // true if it was all good, or false if a fixup was necessary.
  918. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) (bool, error) {
  919. // Walk the sequence index from the current (supposedly) highest
  920. // sequence number and raise the alarm if we get anything. This recovers
  921. // from the occasion where we have written sequence entries to disk but
  922. // not yet written new metadata to disk.
  923. //
  924. // Note that we can have the same thing happen for remote devices but
  925. // there it's not a problem -- we'll simply advertise a lower sequence
  926. // number than we've actually seen and receive some duplicate updates
  927. // and then be in sync again.
  928. t, err := db.newReadOnlyTransaction()
  929. if err != nil {
  930. return false, err
  931. }
  932. ok := true
  933. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi protocol.FileIntf) bool {
  934. ok = false // we got something, which we should not have
  935. return false
  936. }); err != nil {
  937. return false, err
  938. }
  939. t.close()
  940. return ok, nil
  941. }
  942. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  943. // match those in the corresponding file entries. It returns the amount of fixed
  944. // entries.
  945. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  946. t, err := db.newReadWriteTransaction(meta.CommitHook([]byte(folderStr)))
  947. if err != nil {
  948. return 0, err
  949. }
  950. defer t.close()
  951. fixed := 0
  952. folder := []byte(folderStr)
  953. // First check that every file entry has a matching sequence entry
  954. // (this was previously db schema upgrade to 9).
  955. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  956. if err != nil {
  957. return 0, err
  958. }
  959. it, err := t.NewPrefixIterator(dk.WithoutName())
  960. if err != nil {
  961. return 0, err
  962. }
  963. defer it.Release()
  964. var sk sequenceKey
  965. for it.Next() {
  966. intf, err := t.unmarshalTrunc(it.Value(), false)
  967. if err != nil {
  968. // Delete local items with invalid indirected blocks/versions.
  969. // They will be rescanned.
  970. var ierr *blocksIndirectionError
  971. if ok := errors.As(err, &ierr); ok && backend.IsNotFound(err) {
  972. intf, err = t.unmarshalTrunc(it.Value(), true)
  973. if err != nil {
  974. return 0, err
  975. }
  976. name := []byte(intf.FileName())
  977. gk, err := t.keyer.GenerateGlobalVersionKey(nil, folder, name)
  978. if err != nil {
  979. return 0, err
  980. }
  981. _, err = t.removeFromGlobal(gk, nil, folder, protocol.LocalDeviceID[:], name, nil)
  982. if err != nil {
  983. return 0, err
  984. }
  985. sk, err = db.keyer.GenerateSequenceKey(sk, folder, intf.SequenceNo())
  986. if err != nil {
  987. return 0, err
  988. }
  989. if err := t.Delete(sk); err != nil {
  990. return 0, err
  991. }
  992. if err := t.Delete(it.Key()); err != nil {
  993. return 0, err
  994. }
  995. }
  996. return 0, err
  997. }
  998. fi := intf.(protocol.FileInfo)
  999. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  1000. return 0, err
  1001. }
  1002. switch dk, err = t.Get(sk); {
  1003. case err != nil:
  1004. if !backend.IsNotFound(err) {
  1005. return 0, err
  1006. }
  1007. fallthrough
  1008. case !bytes.Equal(it.Key(), dk):
  1009. fixed++
  1010. fi.Sequence = meta.nextLocalSeq()
  1011. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  1012. return 0, err
  1013. }
  1014. if err := t.Put(sk, it.Key()); err != nil {
  1015. return 0, err
  1016. }
  1017. if err := t.putFile(it.Key(), fi); err != nil {
  1018. return 0, err
  1019. }
  1020. }
  1021. if err := t.Checkpoint(); err != nil {
  1022. return 0, err
  1023. }
  1024. }
  1025. if err := it.Error(); err != nil {
  1026. return 0, err
  1027. }
  1028. it.Release()
  1029. // Secondly check there's no sequence entries pointing at incorrect things.
  1030. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  1031. if err != nil {
  1032. return 0, err
  1033. }
  1034. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  1035. if err != nil {
  1036. return 0, err
  1037. }
  1038. defer it.Release()
  1039. for it.Next() {
  1040. // Check that the sequence from the key matches the
  1041. // sequence in the file.
  1042. fi, ok, err := t.getFileTrunc(it.Value(), true)
  1043. if err != nil {
  1044. return 0, err
  1045. }
  1046. if ok {
  1047. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  1048. continue
  1049. }
  1050. }
  1051. // Either the file is missing or has a different sequence number
  1052. fixed++
  1053. if err := t.Delete(it.Key()); err != nil {
  1054. return 0, err
  1055. }
  1056. }
  1057. if err := it.Error(); err != nil {
  1058. return 0, err
  1059. }
  1060. it.Release()
  1061. return fixed, t.Commit()
  1062. }
  1063. // Does not take care of metadata - if anything is repaired, the need count
  1064. // needs to be recalculated.
  1065. func (db *Lowlevel) checkLocalNeed(folder []byte) (int, error) {
  1066. repaired := 0
  1067. t, err := db.newReadWriteTransaction()
  1068. if err != nil {
  1069. return 0, err
  1070. }
  1071. defer t.close()
  1072. key, err := t.keyer.GenerateNeedFileKey(nil, folder, nil)
  1073. if err != nil {
  1074. return 0, err
  1075. }
  1076. dbi, err := t.NewPrefixIterator(key.WithoutName())
  1077. if err != nil {
  1078. return 0, err
  1079. }
  1080. defer dbi.Release()
  1081. var needName string
  1082. var needDone bool
  1083. next := func() {
  1084. needDone = !dbi.Next()
  1085. if !needDone {
  1086. needName = string(t.keyer.NameFromGlobalVersionKey(dbi.Key()))
  1087. }
  1088. }
  1089. next()
  1090. t.withNeedIteratingGlobal(folder, protocol.LocalDeviceID[:], true, func(fi protocol.FileIntf) bool {
  1091. f := fi.(FileInfoTruncated)
  1092. for !needDone && needName < f.Name {
  1093. repaired++
  1094. if err = t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  1095. return false
  1096. }
  1097. l.Debugln("check local need: removing", needName)
  1098. next()
  1099. }
  1100. if needName == f.Name {
  1101. next()
  1102. } else {
  1103. repaired++
  1104. key, err = t.keyer.GenerateNeedFileKey(key, folder, []byte(f.Name))
  1105. if err != nil {
  1106. return false
  1107. }
  1108. if err = t.Put(key, nil); err != nil {
  1109. return false
  1110. }
  1111. l.Debugln("check local need: adding", f.Name)
  1112. }
  1113. return true
  1114. })
  1115. if err != nil {
  1116. return 0, err
  1117. }
  1118. for !needDone {
  1119. repaired++
  1120. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  1121. return 0, err
  1122. }
  1123. l.Debugln("check local need: removing", needName)
  1124. next()
  1125. }
  1126. if err := dbi.Error(); err != nil {
  1127. return 0, err
  1128. }
  1129. dbi.Release()
  1130. if err = t.Commit(); err != nil {
  1131. return 0, err
  1132. }
  1133. return repaired, nil
  1134. }
  1135. // checkSequencesUnchanged resets delta indexes for any device where the
  1136. // sequence changed.
  1137. func (db *Lowlevel) checkSequencesUnchanged(folder string, oldMeta, meta *metadataTracker) error {
  1138. t, err := db.newReadWriteTransaction()
  1139. if err != nil {
  1140. return err
  1141. }
  1142. defer t.close()
  1143. var key []byte
  1144. deleteIndexID := func(devID protocol.DeviceID) error {
  1145. key, err = db.keyer.GenerateIndexIDKey(key, devID[:], []byte(folder))
  1146. if err != nil {
  1147. return err
  1148. }
  1149. return t.Delete(key)
  1150. }
  1151. if oldMeta.Sequence(protocol.LocalDeviceID) != meta.Sequence(protocol.LocalDeviceID) {
  1152. if err := deleteIndexID(protocol.LocalDeviceID); err != nil {
  1153. return err
  1154. }
  1155. l.Infof("Local sequence for folder %v changed while repairing - dropping delta indexes", folder)
  1156. }
  1157. oldDevices := oldMeta.devices()
  1158. oldSequences := make(map[protocol.DeviceID]int64, len(oldDevices))
  1159. for _, devID := range oldDevices {
  1160. oldSequences[devID] = oldMeta.Sequence(devID)
  1161. }
  1162. for _, devID := range meta.devices() {
  1163. oldSeq := oldSequences[devID]
  1164. delete(oldSequences, devID)
  1165. // A lower sequence number just means we will receive some indexes again.
  1166. if oldSeq >= meta.Sequence(devID) {
  1167. if oldSeq > meta.Sequence(devID) {
  1168. db.evLogger.Log(events.Failure, "lower remote sequence after recalculating metadata")
  1169. }
  1170. continue
  1171. }
  1172. db.evLogger.Log(events.Failure, "higher remote sequence after recalculating metadata")
  1173. if err := deleteIndexID(devID); err != nil {
  1174. return err
  1175. }
  1176. l.Infof("Sequence of device %v for folder %v changed while repairing - dropping delta indexes", devID.Short(), folder)
  1177. }
  1178. for devID := range oldSequences {
  1179. if err := deleteIndexID(devID); err != nil {
  1180. return err
  1181. }
  1182. l.Debugf("Removed indexID of device %v for folder %v which isn't present anymore", devID.Short(), folder)
  1183. }
  1184. return t.Commit()
  1185. }
  1186. func (db *Lowlevel) needsRepairPath() string {
  1187. path := db.Location()
  1188. if path == "" {
  1189. return ""
  1190. }
  1191. if path[len(path)-1] == fs.PathSeparator {
  1192. path = path[:len(path)-1]
  1193. }
  1194. return path + needsRepairSuffix
  1195. }
  1196. func (db *Lowlevel) checkErrorForRepair(err error) {
  1197. if errors.Is(err, errEntryFromGlobalMissing) || errors.Is(err, errEmptyGlobal) {
  1198. // Inconsistency error, mark db for repair on next start.
  1199. if path := db.needsRepairPath(); path != "" {
  1200. if fd, err := os.Create(path); err == nil {
  1201. fd.Close()
  1202. }
  1203. }
  1204. }
  1205. }
  1206. // unchanged checks if two files are the same and thus don't need to be updated.
  1207. // Local flags or the invalid bit might change without the version
  1208. // being bumped.
  1209. func unchanged(nf, ef protocol.FileIntf) bool {
  1210. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  1211. }
  1212. func (db *Lowlevel) handleFailure(err error) {
  1213. db.checkErrorForRepair(err)
  1214. if shouldReportFailure(err) {
  1215. db.evLogger.Log(events.Failure, err.Error())
  1216. }
  1217. }
  1218. var ldbPathRe = regexp.MustCompile(`(open|write|read) .+[\\/].+[\\/]index[^\\/]+[\\/][^\\/]+: `)
  1219. func shouldReportFailure(err error) bool {
  1220. return !ldbPathRe.MatchString(err.Error())
  1221. }