lowlevel.go 29 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "fmt"
  12. "io"
  13. "os"
  14. "regexp"
  15. "time"
  16. "github.com/dchest/siphash"
  17. "github.com/greatroar/blobloom"
  18. "github.com/syncthing/syncthing/lib/db/backend"
  19. "github.com/syncthing/syncthing/lib/fs"
  20. "github.com/syncthing/syncthing/lib/protocol"
  21. "github.com/syncthing/syncthing/lib/rand"
  22. "github.com/syncthing/syncthing/lib/sha256"
  23. "github.com/syncthing/syncthing/lib/sync"
  24. "github.com/syncthing/syncthing/lib/util"
  25. "github.com/thejerf/suture/v4"
  26. )
  27. const (
  28. // We set the bloom filter capacity to handle 100k individual items with
  29. // a false positive probability of 1% for the first pass. Once we know
  30. // how many items we have we will use that number instead, if it's more
  31. // than 100k. For fewer than 100k items we will just get better false
  32. // positive rate instead.
  33. indirectGCBloomCapacity = 100000
  34. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  35. indirectGCBloomMaxBytes = 32 << 20 // Use at most 32MiB memory, which covers our desired FP rate at 27 M items
  36. indirectGCDefaultInterval = 13 * time.Hour
  37. indirectGCTimeKey = "lastIndirectGCTime"
  38. // Use indirection for the block list when it exceeds this many entries
  39. blocksIndirectionCutoff = 3
  40. // Use indirection for the version vector when it exceeds this many entries
  41. versionIndirectionCutoff = 10
  42. recheckDefaultInterval = 30 * 24 * time.Hour
  43. needsRepairSuffix = ".needsrepair"
  44. )
  45. // Lowlevel is the lowest level database interface. It has a very simple
  46. // purpose: hold the actual backend database, and the in-memory state
  47. // that belong to that database. In the same way that a single on disk
  48. // database can only be opened once, there should be only one Lowlevel for
  49. // any given backend.
  50. type Lowlevel struct {
  51. *suture.Supervisor
  52. backend.Backend
  53. folderIdx *smallIndex
  54. deviceIdx *smallIndex
  55. keyer keyer
  56. gcMut sync.RWMutex
  57. gcKeyCount int
  58. indirectGCInterval time.Duration
  59. recheckInterval time.Duration
  60. }
  61. func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
  62. // Only log restarts in debug mode.
  63. spec := util.SpecWithDebugLogger(l)
  64. db := &Lowlevel{
  65. Supervisor: suture.New("db.Lowlevel", spec),
  66. Backend: backend,
  67. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  68. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  69. gcMut: sync.NewRWMutex(),
  70. indirectGCInterval: indirectGCDefaultInterval,
  71. recheckInterval: recheckDefaultInterval,
  72. }
  73. for _, opt := range opts {
  74. opt(db)
  75. }
  76. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  77. db.Add(util.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  78. if path := db.needsRepairPath(); path != "" {
  79. if _, err := os.Lstat(path); err == nil {
  80. l.Infoln("Database was marked for repair - this may take a while")
  81. db.checkRepair()
  82. os.Remove(path)
  83. }
  84. }
  85. return db
  86. }
  87. type Option func(*Lowlevel)
  88. // WithRecheckInterval sets the time interval in between metadata recalculations
  89. // and consistency checks.
  90. func WithRecheckInterval(dur time.Duration) Option {
  91. return func(db *Lowlevel) {
  92. if dur > 0 {
  93. db.recheckInterval = dur
  94. }
  95. }
  96. }
  97. // WithIndirectGCInterval sets the time interval in between GC runs.
  98. func WithIndirectGCInterval(dur time.Duration) Option {
  99. return func(db *Lowlevel) {
  100. if dur > 0 {
  101. db.indirectGCInterval = dur
  102. }
  103. }
  104. }
  105. // ListFolders returns the list of folders currently in the database
  106. func (db *Lowlevel) ListFolders() []string {
  107. return db.folderIdx.Values()
  108. }
  109. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  110. // global versionlist and metadata.
  111. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  112. db.gcMut.RLock()
  113. defer db.gcMut.RUnlock()
  114. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  115. if err != nil {
  116. return err
  117. }
  118. defer t.close()
  119. var dk, gk, keyBuf []byte
  120. devID, err := protocol.DeviceIDFromBytes(device)
  121. if err != nil {
  122. return err
  123. }
  124. for _, f := range fs {
  125. name := []byte(f.Name)
  126. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  127. if err != nil {
  128. return err
  129. }
  130. ef, ok, err := t.getFileTrunc(dk, true)
  131. if err != nil {
  132. return err
  133. }
  134. if ok && unchanged(f, ef) {
  135. l.Debugf("not inserting unchanged (remote); folder=%q device=%v %v", folder, devID, f)
  136. continue
  137. }
  138. if ok {
  139. meta.removeFile(devID, ef)
  140. }
  141. meta.addFile(devID, f)
  142. l.Debugf("insert (remote); folder=%q device=%v %v", folder, devID, f)
  143. if err := t.putFile(dk, f); err != nil {
  144. return err
  145. }
  146. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  147. if err != nil {
  148. return err
  149. }
  150. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  151. if err != nil {
  152. return err
  153. }
  154. if err := t.Checkpoint(); err != nil {
  155. return err
  156. }
  157. }
  158. return t.Commit()
  159. }
  160. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  161. // metadata, sequence and blockmap buckets.
  162. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  163. db.gcMut.RLock()
  164. defer db.gcMut.RUnlock()
  165. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  166. if err != nil {
  167. return err
  168. }
  169. defer t.close()
  170. var dk, gk, keyBuf []byte
  171. blockBuf := make([]byte, 4)
  172. for _, f := range fs {
  173. name := []byte(f.Name)
  174. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  175. if err != nil {
  176. return err
  177. }
  178. ef, ok, err := t.getFileByKey(dk)
  179. if err != nil {
  180. return err
  181. }
  182. if ok && unchanged(f, ef) {
  183. l.Debugf("not inserting unchanged (local); folder=%q %v", folder, f)
  184. continue
  185. }
  186. blocksHashSame := ok && bytes.Equal(ef.BlocksHash, f.BlocksHash)
  187. if ok {
  188. if len(ef.Blocks) != 0 && !ef.IsInvalid() && ef.Size > 0 {
  189. for _, block := range ef.Blocks {
  190. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  191. if err != nil {
  192. return err
  193. }
  194. if err := t.Delete(keyBuf); err != nil {
  195. return err
  196. }
  197. }
  198. if !blocksHashSame {
  199. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, ef.BlocksHash, name)
  200. if err != nil {
  201. return err
  202. }
  203. if err = t.Delete(keyBuf); err != nil {
  204. return err
  205. }
  206. }
  207. }
  208. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  209. if err != nil {
  210. return err
  211. }
  212. if err := t.Delete(keyBuf); err != nil {
  213. return err
  214. }
  215. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  216. }
  217. f.Sequence = meta.nextLocalSeq()
  218. if ok {
  219. meta.removeFile(protocol.LocalDeviceID, ef)
  220. }
  221. meta.addFile(protocol.LocalDeviceID, f)
  222. l.Debugf("insert (local); folder=%q %v", folder, f)
  223. if err := t.putFile(dk, f); err != nil {
  224. return err
  225. }
  226. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  227. if err != nil {
  228. return err
  229. }
  230. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  231. if err != nil {
  232. return err
  233. }
  234. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  235. if err != nil {
  236. return err
  237. }
  238. if err := t.Put(keyBuf, dk); err != nil {
  239. return err
  240. }
  241. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  242. if len(f.Blocks) != 0 && !f.IsInvalid() && f.Size > 0 {
  243. for i, block := range f.Blocks {
  244. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  245. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  246. if err != nil {
  247. return err
  248. }
  249. if err := t.Put(keyBuf, blockBuf); err != nil {
  250. return err
  251. }
  252. }
  253. if !blocksHashSame {
  254. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, f.BlocksHash, name)
  255. if err != nil {
  256. return err
  257. }
  258. if err = t.Put(keyBuf, nil); err != nil {
  259. return err
  260. }
  261. }
  262. }
  263. if err := t.Checkpoint(); err != nil {
  264. return err
  265. }
  266. }
  267. return t.Commit()
  268. }
  269. func (db *Lowlevel) dropFolder(folder []byte) error {
  270. db.gcMut.RLock()
  271. defer db.gcMut.RUnlock()
  272. t, err := db.newReadWriteTransaction()
  273. if err != nil {
  274. return err
  275. }
  276. defer t.close()
  277. // Remove all items related to the given folder from the device->file bucket
  278. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  279. if err != nil {
  280. return err
  281. }
  282. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  283. return err
  284. }
  285. // Remove all sequences related to the folder
  286. k1, err := db.keyer.GenerateSequenceKey(k0, folder, 0)
  287. if err != nil {
  288. return err
  289. }
  290. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  291. return err
  292. }
  293. // Remove all items related to the given folder from the global bucket
  294. k2, err := db.keyer.GenerateGlobalVersionKey(k1, folder, nil)
  295. if err != nil {
  296. return err
  297. }
  298. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  299. return err
  300. }
  301. // Remove all needs related to the folder
  302. k3, err := db.keyer.GenerateNeedFileKey(k2, folder, nil)
  303. if err != nil {
  304. return err
  305. }
  306. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  307. return err
  308. }
  309. // Remove the blockmap of the folder
  310. k4, err := db.keyer.GenerateBlockMapKey(k3, folder, nil, nil)
  311. if err != nil {
  312. return err
  313. }
  314. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  315. return err
  316. }
  317. k5, err := db.keyer.GenerateBlockListMapKey(k4, folder, nil, nil)
  318. if err != nil {
  319. return err
  320. }
  321. if err := t.deleteKeyPrefix(k5.WithoutHashAndName()); err != nil {
  322. return err
  323. }
  324. return t.Commit()
  325. }
  326. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  327. db.gcMut.RLock()
  328. defer db.gcMut.RUnlock()
  329. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  330. if err != nil {
  331. return err
  332. }
  333. defer t.close()
  334. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  335. if err != nil {
  336. return err
  337. }
  338. dbi, err := t.NewPrefixIterator(key)
  339. if err != nil {
  340. return err
  341. }
  342. defer dbi.Release()
  343. var gk, keyBuf []byte
  344. for dbi.Next() {
  345. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  346. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  347. if err != nil {
  348. return err
  349. }
  350. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  351. if err != nil {
  352. return err
  353. }
  354. if err := t.Delete(dbi.Key()); err != nil {
  355. return err
  356. }
  357. if err := t.Checkpoint(); err != nil {
  358. return err
  359. }
  360. }
  361. dbi.Release()
  362. if err := dbi.Error(); err != nil {
  363. return err
  364. }
  365. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  366. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  367. if err != nil {
  368. return err
  369. }
  370. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  371. return err
  372. }
  373. key2, err := db.keyer.GenerateBlockListMapKey(key, folder, nil, nil)
  374. if err != nil {
  375. return err
  376. }
  377. if err := t.deleteKeyPrefix(key2.WithoutHashAndName()); err != nil {
  378. return err
  379. }
  380. }
  381. return t.Commit()
  382. }
  383. func (db *Lowlevel) checkGlobals(folder []byte) error {
  384. t, err := db.newReadWriteTransaction()
  385. if err != nil {
  386. return err
  387. }
  388. defer t.close()
  389. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  390. if err != nil {
  391. return err
  392. }
  393. dbi, err := t.NewPrefixIterator(key.WithoutName())
  394. if err != nil {
  395. return err
  396. }
  397. defer dbi.Release()
  398. var dk []byte
  399. ro := t.readOnlyTransaction
  400. for dbi.Next() {
  401. var vl VersionList
  402. if err := vl.Unmarshal(dbi.Value()); err != nil || vl.Empty() {
  403. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  404. return err
  405. }
  406. continue
  407. }
  408. // Check the global version list for consistency. An issue in previous
  409. // versions of goleveldb could result in reordered writes so that
  410. // there are global entries pointing to no longer existing files. Here
  411. // we find those and clear them out.
  412. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  413. newVL := &VersionList{}
  414. var changed, changedHere bool
  415. for _, fv := range vl.RawVersions {
  416. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.Devices, newVL, ro)
  417. if err != nil {
  418. return err
  419. }
  420. changed = changed || changedHere
  421. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.InvalidDevices, newVL, ro)
  422. if err != nil {
  423. return err
  424. }
  425. changed = changed || changedHere
  426. }
  427. if newVL.Empty() {
  428. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  429. return err
  430. }
  431. } else if changed {
  432. if err := t.Put(dbi.Key(), mustMarshal(newVL)); err != nil {
  433. return err
  434. }
  435. }
  436. }
  437. dbi.Release()
  438. if err := dbi.Error(); err != nil {
  439. return err
  440. }
  441. l.Debugf("db check completed for %q", folder)
  442. return t.Commit()
  443. }
  444. func checkGlobalsFilterDevices(dk, folder, name []byte, devices [][]byte, vl *VersionList, t readOnlyTransaction) (bool, error) {
  445. var changed bool
  446. var err error
  447. for _, device := range devices {
  448. dk, err = t.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  449. if err != nil {
  450. return false, err
  451. }
  452. f, ok, err := t.getFileTrunc(dk, false)
  453. if err != nil {
  454. return false, err
  455. }
  456. if !ok {
  457. changed = true
  458. continue
  459. }
  460. _, _, _, _, _, _, err = vl.update(folder, device, f, t)
  461. if err != nil {
  462. return false, err
  463. }
  464. }
  465. return changed, nil
  466. }
  467. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  468. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  469. if err != nil {
  470. return 0, err
  471. }
  472. cur, err := db.Get(key)
  473. if backend.IsNotFound(err) {
  474. return 0, nil
  475. } else if err != nil {
  476. return 0, err
  477. }
  478. var id protocol.IndexID
  479. if err := id.Unmarshal(cur); err != nil {
  480. return 0, nil
  481. }
  482. return id, nil
  483. }
  484. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  485. bs, _ := id.Marshal() // marshalling can't fail
  486. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  487. if err != nil {
  488. return err
  489. }
  490. return db.Put(key, bs)
  491. }
  492. func (db *Lowlevel) dropMtimes(folder []byte) error {
  493. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  494. if err != nil {
  495. return err
  496. }
  497. return db.dropPrefix(key)
  498. }
  499. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  500. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  501. if err != nil {
  502. return err
  503. }
  504. return db.dropPrefix(key)
  505. }
  506. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  507. t, err := db.newReadWriteTransaction()
  508. if err != nil {
  509. return err
  510. }
  511. defer t.close()
  512. if err := t.deleteKeyPrefix(prefix); err != nil {
  513. return err
  514. }
  515. return t.Commit()
  516. }
  517. func (db *Lowlevel) gcRunner(ctx context.Context) error {
  518. // Calculate the time for the next GC run. Even if we should run GC
  519. // directly, give the system a while to get up and running and do other
  520. // stuff first. (We might have migrations and stuff which would be
  521. // better off running before GC.)
  522. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  523. if next < time.Minute {
  524. next = time.Minute
  525. }
  526. t := time.NewTimer(next)
  527. defer t.Stop()
  528. for {
  529. select {
  530. case <-ctx.Done():
  531. return ctx.Err()
  532. case <-t.C:
  533. if err := db.gcIndirect(ctx); err != nil {
  534. l.Warnln("Database indirection GC failed:", err)
  535. }
  536. db.recordTime(indirectGCTimeKey)
  537. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  538. }
  539. }
  540. }
  541. // recordTime records the current time under the given key, affecting the
  542. // next call to timeUntil with the same key.
  543. func (db *Lowlevel) recordTime(key string) {
  544. miscDB := NewMiscDataNamespace(db)
  545. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  546. }
  547. // timeUntil returns how long we should wait until the next interval, or
  548. // zero if it should happen directly.
  549. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  550. miscDB := NewMiscDataNamespace(db)
  551. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  552. nextTime := time.Unix(lastTime, 0).Add(every)
  553. sleepTime := time.Until(nextTime)
  554. if sleepTime < 0 {
  555. sleepTime = 0
  556. }
  557. return sleepTime
  558. }
  559. func (db *Lowlevel) gcIndirect(ctx context.Context) error {
  560. // The indirection GC uses bloom filters to track used block lists and
  561. // versions. This means iterating over all items, adding their hashes to
  562. // the filter, then iterating over the indirected items and removing
  563. // those that don't match the filter. The filter will give false
  564. // positives so we will keep around one percent of things that we don't
  565. // really need (at most).
  566. //
  567. // Indirection GC needs to run when there are no modifications to the
  568. // FileInfos or indirected items.
  569. db.gcMut.Lock()
  570. defer db.gcMut.Unlock()
  571. t, err := db.newReadWriteTransaction()
  572. if err != nil {
  573. return err
  574. }
  575. defer t.Release()
  576. // Set up the bloom filters with the initial capacity and false positive
  577. // rate, or higher capacity if we've done this before and seen lots of
  578. // items. For simplicity's sake we track just one count, which is the
  579. // highest of the various indirected items.
  580. capacity := indirectGCBloomCapacity
  581. if db.gcKeyCount > capacity {
  582. capacity = db.gcKeyCount
  583. }
  584. blockFilter := newBloomFilter(capacity)
  585. versionFilter := newBloomFilter(capacity)
  586. // Iterate the FileInfos, unmarshal the block and version hashes and
  587. // add them to the filter.
  588. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  589. if err != nil {
  590. return err
  591. }
  592. defer it.Release()
  593. for it.Next() {
  594. select {
  595. case <-ctx.Done():
  596. return ctx.Err()
  597. default:
  598. }
  599. var hashes IndirectionHashesOnly
  600. if err := hashes.Unmarshal(it.Value()); err != nil {
  601. return err
  602. }
  603. if len(hashes.BlocksHash) > 0 {
  604. blockFilter.add(hashes.BlocksHash)
  605. }
  606. if len(hashes.VersionHash) > 0 {
  607. versionFilter.add(hashes.VersionHash)
  608. }
  609. }
  610. it.Release()
  611. if err := it.Error(); err != nil {
  612. return err
  613. }
  614. // Iterate over block lists, removing keys with hashes that don't match
  615. // the filter.
  616. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  617. if err != nil {
  618. return err
  619. }
  620. defer it.Release()
  621. matchedBlocks := 0
  622. for it.Next() {
  623. select {
  624. case <-ctx.Done():
  625. return ctx.Err()
  626. default:
  627. }
  628. key := blockListKey(it.Key())
  629. if blockFilter.has(key.Hash()) {
  630. matchedBlocks++
  631. continue
  632. }
  633. if err := t.Delete(key); err != nil {
  634. return err
  635. }
  636. }
  637. it.Release()
  638. if err := it.Error(); err != nil {
  639. return err
  640. }
  641. // Iterate over version lists, removing keys with hashes that don't match
  642. // the filter.
  643. it, err = db.NewPrefixIterator([]byte{KeyTypeVersion})
  644. if err != nil {
  645. return err
  646. }
  647. matchedVersions := 0
  648. for it.Next() {
  649. select {
  650. case <-ctx.Done():
  651. return ctx.Err()
  652. default:
  653. }
  654. key := versionKey(it.Key())
  655. if versionFilter.has(key.Hash()) {
  656. matchedVersions++
  657. continue
  658. }
  659. if err := t.Delete(key); err != nil {
  660. return err
  661. }
  662. }
  663. it.Release()
  664. if err := it.Error(); err != nil {
  665. return err
  666. }
  667. // Remember the number of unique keys we kept until the next pass.
  668. db.gcKeyCount = matchedBlocks
  669. if matchedVersions > matchedBlocks {
  670. db.gcKeyCount = matchedVersions
  671. }
  672. if err := t.Commit(); err != nil {
  673. return err
  674. }
  675. return db.Compact()
  676. }
  677. func newBloomFilter(capacity int) bloomFilter {
  678. var buf [16]byte
  679. io.ReadFull(rand.Reader, buf[:])
  680. return bloomFilter{
  681. f: blobloom.NewOptimized(blobloom.Config{
  682. Capacity: uint64(capacity),
  683. FPRate: indirectGCBloomFalsePositiveRate,
  684. MaxBits: 8 * indirectGCBloomMaxBytes,
  685. }),
  686. k0: binary.LittleEndian.Uint64(buf[:8]),
  687. k1: binary.LittleEndian.Uint64(buf[8:]),
  688. }
  689. }
  690. type bloomFilter struct {
  691. f *blobloom.Filter
  692. k0, k1 uint64 // Random key for SipHash.
  693. }
  694. func (b *bloomFilter) add(id []byte) { b.f.Add(b.hash(id)) }
  695. func (b *bloomFilter) has(id []byte) bool { return b.f.Has(b.hash(id)) }
  696. // Hash function for the bloomfilter: SipHash of the SHA-256.
  697. //
  698. // The randomization in SipHash means we get different collisions across
  699. // runs and colliding keys are not kept indefinitely.
  700. func (b *bloomFilter) hash(id []byte) uint64 {
  701. if len(id) != sha256.Size {
  702. panic("bug: bloomFilter.hash passed something not a SHA256 hash")
  703. }
  704. return siphash.Hash(b.k0, b.k1, id)
  705. }
  706. // checkRepair checks folder metadata and sequences for miscellaneous errors.
  707. func (db *Lowlevel) checkRepair() {
  708. for _, folder := range db.ListFolders() {
  709. _ = db.getMetaAndCheck(folder)
  710. }
  711. }
  712. func (db *Lowlevel) getMetaAndCheck(folder string) *metadataTracker {
  713. db.gcMut.RLock()
  714. defer db.gcMut.RUnlock()
  715. var err error
  716. defer func() {
  717. if err != nil && !backend.IsClosed(err) {
  718. l.Warnf("Fatal error: %v", err)
  719. obfuscateAndPanic(err)
  720. }
  721. }()
  722. var fixed int
  723. fixed, err = db.checkLocalNeed([]byte(folder))
  724. if err != nil {
  725. err = fmt.Errorf("checking local need: %w", err)
  726. return nil
  727. }
  728. if fixed != 0 {
  729. l.Infof("Repaired %d local need entries for folder %v in database", fixed, folder)
  730. }
  731. meta, err := db.recalcMeta(folder)
  732. if err != nil {
  733. err = fmt.Errorf("recalculating metadata: %w", err)
  734. return nil
  735. }
  736. fixed, err = db.repairSequenceGCLocked(folder, meta)
  737. if err != nil {
  738. err = fmt.Errorf("repairing sequences: %w", err)
  739. return nil
  740. }
  741. if fixed != 0 {
  742. l.Infof("Repaired %d sequence entries for folder %v in database", fixed, folder)
  743. }
  744. return meta
  745. }
  746. func (db *Lowlevel) loadMetadataTracker(folder string) *metadataTracker {
  747. meta := newMetadataTracker(db.keyer)
  748. if err := meta.fromDB(db, []byte(folder)); err != nil {
  749. if err == errMetaInconsistent {
  750. l.Infof("Stored folder metadata for %q is inconsistent; recalculating", folder)
  751. } else {
  752. l.Infof("No stored folder metadata for %q; recalculating", folder)
  753. }
  754. return db.getMetaAndCheck(folder)
  755. }
  756. curSeq := meta.Sequence(protocol.LocalDeviceID)
  757. if metaOK := db.verifyLocalSequence(curSeq, folder); !metaOK {
  758. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  759. return db.getMetaAndCheck(folder)
  760. }
  761. if age := time.Since(meta.Created()); age > db.recheckInterval {
  762. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, util.NiceDurationString(age))
  763. return db.getMetaAndCheck(folder)
  764. }
  765. return meta
  766. }
  767. func (db *Lowlevel) recalcMeta(folderStr string) (*metadataTracker, error) {
  768. folder := []byte(folderStr)
  769. meta := newMetadataTracker(db.keyer)
  770. if err := db.checkGlobals(folder); err != nil {
  771. return nil, fmt.Errorf("checking globals: %w", err)
  772. }
  773. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  774. if err != nil {
  775. return nil, err
  776. }
  777. defer t.close()
  778. var deviceID protocol.DeviceID
  779. err = t.withAllFolderTruncated(folder, func(device []byte, f FileInfoTruncated) bool {
  780. copy(deviceID[:], device)
  781. meta.addFile(deviceID, f)
  782. return true
  783. })
  784. if err != nil {
  785. return nil, err
  786. }
  787. err = t.withGlobal(folder, nil, true, func(f protocol.FileIntf) bool {
  788. meta.addFile(protocol.GlobalDeviceID, f)
  789. return true
  790. })
  791. meta.emptyNeeded(protocol.LocalDeviceID)
  792. err = t.withNeed(folder, protocol.LocalDeviceID[:], true, func(f protocol.FileIntf) bool {
  793. meta.addNeeded(protocol.LocalDeviceID, f)
  794. return true
  795. })
  796. if err != nil {
  797. return nil, err
  798. }
  799. for _, device := range meta.devices() {
  800. meta.emptyNeeded(device)
  801. err = t.withNeed(folder, device[:], true, func(f protocol.FileIntf) bool {
  802. meta.addNeeded(device, f)
  803. return true
  804. })
  805. if err != nil {
  806. return nil, err
  807. }
  808. }
  809. meta.SetCreated()
  810. if err := t.Commit(); err != nil {
  811. return nil, err
  812. }
  813. return meta, nil
  814. }
  815. // Verify the local sequence number from actual sequence entries. Returns
  816. // true if it was all good, or false if a fixup was necessary.
  817. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) bool {
  818. // Walk the sequence index from the current (supposedly) highest
  819. // sequence number and raise the alarm if we get anything. This recovers
  820. // from the occasion where we have written sequence entries to disk but
  821. // not yet written new metadata to disk.
  822. //
  823. // Note that we can have the same thing happen for remote devices but
  824. // there it's not a problem -- we'll simply advertise a lower sequence
  825. // number than we've actually seen and receive some duplicate updates
  826. // and then be in sync again.
  827. t, err := db.newReadOnlyTransaction()
  828. if err != nil {
  829. l.Warnf("Fatal error: %v", err)
  830. obfuscateAndPanic(err)
  831. }
  832. ok := true
  833. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi protocol.FileIntf) bool {
  834. ok = false // we got something, which we should not have
  835. return false
  836. }); err != nil && !backend.IsClosed(err) {
  837. l.Warnf("Fatal error: %v", err)
  838. obfuscateAndPanic(err)
  839. }
  840. t.close()
  841. return ok
  842. }
  843. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  844. // match those in the corresponding file entries. It returns the amount of fixed
  845. // entries.
  846. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  847. t, err := db.newReadWriteTransaction(meta.CommitHook([]byte(folderStr)))
  848. if err != nil {
  849. return 0, err
  850. }
  851. defer t.close()
  852. fixed := 0
  853. folder := []byte(folderStr)
  854. // First check that every file entry has a matching sequence entry
  855. // (this was previously db schema upgrade to 9).
  856. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  857. if err != nil {
  858. return 0, err
  859. }
  860. it, err := t.NewPrefixIterator(dk.WithoutName())
  861. if err != nil {
  862. return 0, err
  863. }
  864. defer it.Release()
  865. var sk sequenceKey
  866. for it.Next() {
  867. intf, err := t.unmarshalTrunc(it.Value(), false)
  868. if err != nil {
  869. return 0, err
  870. }
  871. fi := intf.(protocol.FileInfo)
  872. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  873. return 0, err
  874. }
  875. switch dk, err = t.Get(sk); {
  876. case err != nil:
  877. if !backend.IsNotFound(err) {
  878. return 0, err
  879. }
  880. fallthrough
  881. case !bytes.Equal(it.Key(), dk):
  882. fixed++
  883. fi.Sequence = meta.nextLocalSeq()
  884. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  885. return 0, err
  886. }
  887. if err := t.Put(sk, it.Key()); err != nil {
  888. return 0, err
  889. }
  890. if err := t.putFile(it.Key(), fi); err != nil {
  891. return 0, err
  892. }
  893. }
  894. if err := t.Checkpoint(); err != nil {
  895. return 0, err
  896. }
  897. }
  898. if err := it.Error(); err != nil {
  899. return 0, err
  900. }
  901. it.Release()
  902. // Secondly check there's no sequence entries pointing at incorrect things.
  903. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  904. if err != nil {
  905. return 0, err
  906. }
  907. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  908. if err != nil {
  909. return 0, err
  910. }
  911. defer it.Release()
  912. for it.Next() {
  913. // Check that the sequence from the key matches the
  914. // sequence in the file.
  915. fi, ok, err := t.getFileTrunc(it.Value(), true)
  916. if err != nil {
  917. return 0, err
  918. }
  919. if ok {
  920. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  921. continue
  922. }
  923. }
  924. // Either the file is missing or has a different sequence number
  925. fixed++
  926. if err := t.Delete(it.Key()); err != nil {
  927. return 0, err
  928. }
  929. }
  930. if err := it.Error(); err != nil {
  931. return 0, err
  932. }
  933. it.Release()
  934. return fixed, t.Commit()
  935. }
  936. // Does not take care of metadata - if anything is repaired, the need count
  937. // needs to be recalculated.
  938. func (db *Lowlevel) checkLocalNeed(folder []byte) (int, error) {
  939. repaired := 0
  940. t, err := db.newReadWriteTransaction()
  941. if err != nil {
  942. return 0, err
  943. }
  944. defer t.close()
  945. key, err := t.keyer.GenerateNeedFileKey(nil, folder, nil)
  946. if err != nil {
  947. return 0, err
  948. }
  949. dbi, err := t.NewPrefixIterator(key.WithoutName())
  950. if err != nil {
  951. return 0, err
  952. }
  953. defer dbi.Release()
  954. var needName string
  955. var needDone bool
  956. next := func() {
  957. needDone = !dbi.Next()
  958. if !needDone {
  959. needName = string(t.keyer.NameFromGlobalVersionKey(dbi.Key()))
  960. }
  961. }
  962. next()
  963. t.withNeedIteratingGlobal(folder, protocol.LocalDeviceID[:], true, func(fi protocol.FileIntf) bool {
  964. f := fi.(FileInfoTruncated)
  965. for !needDone && needName < f.Name {
  966. repaired++
  967. if err = t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  968. return false
  969. }
  970. l.Debugln("check local need: removing", needName)
  971. next()
  972. }
  973. if needName == f.Name {
  974. next()
  975. } else {
  976. repaired++
  977. key, err = t.keyer.GenerateNeedFileKey(key, folder, []byte(f.Name))
  978. if err != nil {
  979. return false
  980. }
  981. if err = t.Put(key, nil); err != nil {
  982. return false
  983. }
  984. l.Debugln("check local need: adding", f.Name)
  985. }
  986. return true
  987. })
  988. if err != nil {
  989. return 0, err
  990. }
  991. for !needDone {
  992. repaired++
  993. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  994. return 0, err
  995. }
  996. l.Debugln("check local need: removing", needName)
  997. next()
  998. }
  999. if err := dbi.Error(); err != nil {
  1000. return 0, err
  1001. }
  1002. dbi.Release()
  1003. if err = t.Commit(); err != nil {
  1004. return 0, err
  1005. }
  1006. return repaired, nil
  1007. }
  1008. func (db *Lowlevel) needsRepairPath() string {
  1009. path := db.Location()
  1010. if path == "" {
  1011. return ""
  1012. }
  1013. if path[len(path)-1] == fs.PathSeparator {
  1014. path = path[:len(path)-1]
  1015. }
  1016. return path + needsRepairSuffix
  1017. }
  1018. // unchanged checks if two files are the same and thus don't need to be updated.
  1019. // Local flags or the invalid bit might change without the version
  1020. // being bumped.
  1021. func unchanged(nf, ef protocol.FileIntf) bool {
  1022. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  1023. }
  1024. var ldbPathRe = regexp.MustCompile(`(open|write|read) .+[\\/].+[\\/]index[^\\/]+[\\/][^\\/]+: `)
  1025. func obfuscateAndPanic(err error) {
  1026. panic(ldbPathRe.ReplaceAllString(err.Error(), "$1 x: "))
  1027. }