lowlevel.go 25 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "time"
  12. "github.com/greatroar/blobloom"
  13. "github.com/syncthing/syncthing/lib/db/backend"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. "github.com/syncthing/syncthing/lib/sha256"
  16. "github.com/syncthing/syncthing/lib/sync"
  17. "github.com/syncthing/syncthing/lib/util"
  18. "github.com/thejerf/suture"
  19. )
  20. const (
  21. // We set the bloom filter capacity to handle 100k individual items with
  22. // a false positive probability of 1% for the first pass. Once we know
  23. // how many items we have we will use that number instead, if it's more
  24. // than 100k. For fewer than 100k items we will just get better false
  25. // positive rate instead.
  26. indirectGCBloomCapacity = 100000
  27. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  28. indirectGCBloomMaxBytes = 32 << 20 // Use at most 32MiB memory, which covers our desired FP rate at 27 M items
  29. indirectGCDefaultInterval = 13 * time.Hour
  30. indirectGCTimeKey = "lastIndirectGCTime"
  31. // Use indirection for the block list when it exceeds this many entries
  32. blocksIndirectionCutoff = 3
  33. // Use indirection for the version vector when it exceeds this many entries
  34. versionIndirectionCutoff = 10
  35. recheckDefaultInterval = 30 * 24 * time.Hour
  36. )
  37. // Lowlevel is the lowest level database interface. It has a very simple
  38. // purpose: hold the actual backend database, and the in-memory state
  39. // that belong to that database. In the same way that a single on disk
  40. // database can only be opened once, there should be only one Lowlevel for
  41. // any given backend.
  42. type Lowlevel struct {
  43. *suture.Supervisor
  44. backend.Backend
  45. folderIdx *smallIndex
  46. deviceIdx *smallIndex
  47. keyer keyer
  48. gcMut sync.RWMutex
  49. gcKeyCount int
  50. indirectGCInterval time.Duration
  51. recheckInterval time.Duration
  52. }
  53. func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
  54. db := &Lowlevel{
  55. Supervisor: suture.New("db.Lowlevel", suture.Spec{
  56. // Only log restarts in debug mode.
  57. Log: func(line string) {
  58. l.Debugln(line)
  59. },
  60. PassThroughPanics: true,
  61. }),
  62. Backend: backend,
  63. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  64. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  65. gcMut: sync.NewRWMutex(),
  66. indirectGCInterval: indirectGCDefaultInterval,
  67. recheckInterval: recheckDefaultInterval,
  68. }
  69. for _, opt := range opts {
  70. opt(db)
  71. }
  72. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  73. db.Add(util.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  74. return db
  75. }
  76. type Option func(*Lowlevel)
  77. // WithRecheckInterval sets the time interval in between metadata recalculations
  78. // and consistency checks.
  79. func WithRecheckInterval(dur time.Duration) Option {
  80. return func(db *Lowlevel) {
  81. if dur > 0 {
  82. db.recheckInterval = dur
  83. }
  84. }
  85. }
  86. // WithIndirectGCInterval sets the time interval in between GC runs.
  87. func WithIndirectGCInterval(dur time.Duration) Option {
  88. return func(db *Lowlevel) {
  89. if dur > 0 {
  90. db.indirectGCInterval = dur
  91. }
  92. }
  93. }
  94. // ListFolders returns the list of folders currently in the database
  95. func (db *Lowlevel) ListFolders() []string {
  96. return db.folderIdx.Values()
  97. }
  98. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  99. // global versionlist and metadata.
  100. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  101. db.gcMut.RLock()
  102. defer db.gcMut.RUnlock()
  103. t, err := db.newReadWriteTransaction()
  104. if err != nil {
  105. return err
  106. }
  107. defer t.close()
  108. var dk, gk, keyBuf []byte
  109. devID := protocol.DeviceIDFromBytes(device)
  110. for _, f := range fs {
  111. name := []byte(f.Name)
  112. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  113. if err != nil {
  114. return err
  115. }
  116. ef, ok, err := t.getFileTrunc(dk, true)
  117. if err != nil {
  118. return err
  119. }
  120. if ok && unchanged(f, ef) {
  121. continue
  122. }
  123. if ok {
  124. meta.removeFile(devID, ef)
  125. }
  126. meta.addFile(devID, f)
  127. l.Debugf("insert; folder=%q device=%v %v", folder, devID, f)
  128. if err := t.putFile(dk, f, false); err != nil {
  129. return err
  130. }
  131. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  132. if err != nil {
  133. return err
  134. }
  135. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  136. if err != nil {
  137. return err
  138. }
  139. if err := t.Checkpoint(func() error {
  140. return meta.toDB(t, folder)
  141. }); err != nil {
  142. return err
  143. }
  144. }
  145. if err := meta.toDB(t, folder); err != nil {
  146. return err
  147. }
  148. return t.Commit()
  149. }
  150. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  151. // metadata, sequence and blockmap buckets.
  152. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  153. db.gcMut.RLock()
  154. defer db.gcMut.RUnlock()
  155. t, err := db.newReadWriteTransaction()
  156. if err != nil {
  157. return err
  158. }
  159. defer t.close()
  160. var dk, gk, keyBuf []byte
  161. blockBuf := make([]byte, 4)
  162. for _, f := range fs {
  163. name := []byte(f.Name)
  164. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  165. if err != nil {
  166. return err
  167. }
  168. ef, ok, err := t.getFileByKey(dk)
  169. if err != nil {
  170. return err
  171. }
  172. if ok && unchanged(f, ef) {
  173. continue
  174. }
  175. blocksHashSame := ok && bytes.Equal(ef.BlocksHash, f.BlocksHash)
  176. if ok {
  177. if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() {
  178. for _, block := range ef.Blocks {
  179. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  180. if err != nil {
  181. return err
  182. }
  183. if err := t.Delete(keyBuf); err != nil {
  184. return err
  185. }
  186. }
  187. if !blocksHashSame {
  188. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, ef.BlocksHash, name)
  189. if err != nil {
  190. return err
  191. }
  192. if err = t.Delete(keyBuf); err != nil {
  193. return err
  194. }
  195. }
  196. }
  197. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  198. if err != nil {
  199. return err
  200. }
  201. if err := t.Delete(keyBuf); err != nil {
  202. return err
  203. }
  204. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  205. }
  206. f.Sequence = meta.nextLocalSeq()
  207. if ok {
  208. meta.removeFile(protocol.LocalDeviceID, ef)
  209. }
  210. meta.addFile(protocol.LocalDeviceID, f)
  211. l.Debugf("insert (local); folder=%q %v", folder, f)
  212. if err := t.putFile(dk, f, false); err != nil {
  213. return err
  214. }
  215. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  216. if err != nil {
  217. return err
  218. }
  219. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  220. if err != nil {
  221. return err
  222. }
  223. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  224. if err != nil {
  225. return err
  226. }
  227. if err := t.Put(keyBuf, dk); err != nil {
  228. return err
  229. }
  230. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  231. if !f.IsDirectory() && !f.IsDeleted() && !f.IsInvalid() {
  232. for i, block := range f.Blocks {
  233. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  234. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  235. if err != nil {
  236. return err
  237. }
  238. if err := t.Put(keyBuf, blockBuf); err != nil {
  239. return err
  240. }
  241. }
  242. if !blocksHashSame {
  243. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, f.BlocksHash, name)
  244. if err != nil {
  245. return err
  246. }
  247. if err = t.Put(keyBuf, nil); err != nil {
  248. return err
  249. }
  250. }
  251. }
  252. if err := t.Checkpoint(func() error {
  253. return meta.toDB(t, folder)
  254. }); err != nil {
  255. return err
  256. }
  257. }
  258. if err := meta.toDB(t, folder); err != nil {
  259. return err
  260. }
  261. return t.Commit()
  262. }
  263. func (db *Lowlevel) dropFolder(folder []byte) error {
  264. db.gcMut.RLock()
  265. defer db.gcMut.RUnlock()
  266. t, err := db.newReadWriteTransaction()
  267. if err != nil {
  268. return err
  269. }
  270. defer t.close()
  271. // Remove all items related to the given folder from the device->file bucket
  272. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  273. if err != nil {
  274. return err
  275. }
  276. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  277. return err
  278. }
  279. // Remove all sequences related to the folder
  280. k1, err := db.keyer.GenerateSequenceKey(k0, folder, 0)
  281. if err != nil {
  282. return err
  283. }
  284. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  285. return err
  286. }
  287. // Remove all items related to the given folder from the global bucket
  288. k2, err := db.keyer.GenerateGlobalVersionKey(k1, folder, nil)
  289. if err != nil {
  290. return err
  291. }
  292. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  293. return err
  294. }
  295. // Remove all needs related to the folder
  296. k3, err := db.keyer.GenerateNeedFileKey(k2, folder, nil)
  297. if err != nil {
  298. return err
  299. }
  300. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  301. return err
  302. }
  303. // Remove the blockmap of the folder
  304. k4, err := db.keyer.GenerateBlockMapKey(k3, folder, nil, nil)
  305. if err != nil {
  306. return err
  307. }
  308. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  309. return err
  310. }
  311. k5, err := db.keyer.GenerateBlockListMapKey(k4, folder, nil, nil)
  312. if err != nil {
  313. return err
  314. }
  315. if err := t.deleteKeyPrefix(k5.WithoutHashAndName()); err != nil {
  316. return err
  317. }
  318. return t.Commit()
  319. }
  320. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  321. db.gcMut.RLock()
  322. defer db.gcMut.RUnlock()
  323. t, err := db.newReadWriteTransaction()
  324. if err != nil {
  325. return err
  326. }
  327. defer t.close()
  328. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  329. if err != nil {
  330. return err
  331. }
  332. dbi, err := t.NewPrefixIterator(key)
  333. if err != nil {
  334. return err
  335. }
  336. var gk, keyBuf []byte
  337. for dbi.Next() {
  338. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  339. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  340. if err != nil {
  341. return err
  342. }
  343. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  344. if err != nil {
  345. return err
  346. }
  347. if err := t.Delete(dbi.Key()); err != nil {
  348. return err
  349. }
  350. if err := t.Checkpoint(); err != nil {
  351. return err
  352. }
  353. }
  354. if err := dbi.Error(); err != nil {
  355. return err
  356. }
  357. dbi.Release()
  358. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  359. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  360. if err != nil {
  361. return err
  362. }
  363. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  364. return err
  365. }
  366. key2, err := db.keyer.GenerateBlockListMapKey(key, folder, nil, nil)
  367. if err != nil {
  368. return err
  369. }
  370. if err := t.deleteKeyPrefix(key2.WithoutHashAndName()); err != nil {
  371. return err
  372. }
  373. }
  374. return t.Commit()
  375. }
  376. func (db *Lowlevel) checkGlobals(folder []byte, meta *metadataTracker) error {
  377. t, err := db.newReadWriteTransaction()
  378. if err != nil {
  379. return err
  380. }
  381. defer t.close()
  382. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  383. if err != nil {
  384. return err
  385. }
  386. dbi, err := t.NewPrefixIterator(key.WithoutName())
  387. if err != nil {
  388. return err
  389. }
  390. defer dbi.Release()
  391. var dk []byte
  392. for dbi.Next() {
  393. var vl VersionList
  394. if err := vl.Unmarshal(dbi.Value()); err != nil || len(vl.Versions) == 0 {
  395. if err := t.Delete(dbi.Key()); err != nil {
  396. return err
  397. }
  398. continue
  399. }
  400. // Check the global version list for consistency. An issue in previous
  401. // versions of goleveldb could result in reordered writes so that
  402. // there are global entries pointing to no longer existing files. Here
  403. // we find those and clear them out.
  404. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  405. var newVL VersionList
  406. for i, version := range vl.Versions {
  407. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, version.Device, name)
  408. if err != nil {
  409. return err
  410. }
  411. _, err := t.Get(dk)
  412. if backend.IsNotFound(err) {
  413. continue
  414. }
  415. if err != nil {
  416. return err
  417. }
  418. newVL.Versions = append(newVL.Versions, version)
  419. if i == 0 {
  420. if fi, ok, err := t.getFileTrunc(dk, true); err != nil {
  421. return err
  422. } else if ok {
  423. meta.addFile(protocol.GlobalDeviceID, fi)
  424. }
  425. }
  426. }
  427. if newLen := len(newVL.Versions); newLen == 0 {
  428. if err := t.Delete(dbi.Key()); err != nil {
  429. return err
  430. }
  431. } else if newLen != len(vl.Versions) {
  432. if err := t.Put(dbi.Key(), mustMarshal(&newVL)); err != nil {
  433. return err
  434. }
  435. }
  436. }
  437. if err := dbi.Error(); err != nil {
  438. return err
  439. }
  440. l.Debugf("db check completed for %q", folder)
  441. return t.Commit()
  442. }
  443. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  444. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  445. if err != nil {
  446. return 0, err
  447. }
  448. cur, err := db.Get(key)
  449. if backend.IsNotFound(err) {
  450. return 0, nil
  451. } else if err != nil {
  452. return 0, err
  453. }
  454. var id protocol.IndexID
  455. if err := id.Unmarshal(cur); err != nil {
  456. return 0, nil
  457. }
  458. return id, nil
  459. }
  460. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  461. bs, _ := id.Marshal() // marshalling can't fail
  462. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  463. if err != nil {
  464. return err
  465. }
  466. return db.Put(key, bs)
  467. }
  468. func (db *Lowlevel) dropMtimes(folder []byte) error {
  469. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  470. if err != nil {
  471. return err
  472. }
  473. return db.dropPrefix(key)
  474. }
  475. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  476. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  477. if err != nil {
  478. return err
  479. }
  480. return db.dropPrefix(key)
  481. }
  482. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  483. t, err := db.newReadWriteTransaction()
  484. if err != nil {
  485. return err
  486. }
  487. defer t.close()
  488. if err := t.deleteKeyPrefix(prefix); err != nil {
  489. return err
  490. }
  491. return t.Commit()
  492. }
  493. func (db *Lowlevel) gcRunner(ctx context.Context) {
  494. // Calculate the time for the next GC run. Even if we should run GC
  495. // directly, give the system a while to get up and running and do other
  496. // stuff first. (We might have migrations and stuff which would be
  497. // better off running before GC.)
  498. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  499. if next < time.Minute {
  500. next = time.Minute
  501. }
  502. t := time.NewTimer(next)
  503. defer t.Stop()
  504. for {
  505. select {
  506. case <-ctx.Done():
  507. return
  508. case <-t.C:
  509. if err := db.gcIndirect(ctx); err != nil {
  510. l.Warnln("Database indirection GC failed:", err)
  511. }
  512. db.recordTime(indirectGCTimeKey)
  513. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  514. }
  515. }
  516. }
  517. // recordTime records the current time under the given key, affecting the
  518. // next call to timeUntil with the same key.
  519. func (db *Lowlevel) recordTime(key string) {
  520. miscDB := NewMiscDataNamespace(db)
  521. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  522. }
  523. // timeUntil returns how long we should wait until the next interval, or
  524. // zero if it should happen directly.
  525. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  526. miscDB := NewMiscDataNamespace(db)
  527. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  528. nextTime := time.Unix(lastTime, 0).Add(every)
  529. sleepTime := time.Until(nextTime)
  530. if sleepTime < 0 {
  531. sleepTime = 0
  532. }
  533. return sleepTime
  534. }
  535. func (db *Lowlevel) gcIndirect(ctx context.Context) error {
  536. // The indirection GC uses bloom filters to track used block lists and
  537. // versions. This means iterating over all items, adding their hashes to
  538. // the filter, then iterating over the indirected items and removing
  539. // those that don't match the filter. The filter will give false
  540. // positives so we will keep around one percent of things that we don't
  541. // really need (at most).
  542. //
  543. // Indirection GC needs to run when there are no modifications to the
  544. // FileInfos or indirected items.
  545. db.gcMut.Lock()
  546. defer db.gcMut.Unlock()
  547. t, err := db.newReadWriteTransaction()
  548. if err != nil {
  549. return err
  550. }
  551. defer t.Release()
  552. // Set up the bloom filters with the initial capacity and false positive
  553. // rate, or higher capacity if we've done this before and seen lots of
  554. // items. For simplicity's sake we track just one count, which is the
  555. // highest of the various indirected items.
  556. capacity := indirectGCBloomCapacity
  557. if db.gcKeyCount > capacity {
  558. capacity = db.gcKeyCount
  559. }
  560. blockFilter := newBloomFilter(capacity)
  561. versionFilter := newBloomFilter(capacity)
  562. // Iterate the FileInfos, unmarshal the block and version hashes and
  563. // add them to the filter.
  564. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  565. if err != nil {
  566. return err
  567. }
  568. defer it.Release()
  569. for it.Next() {
  570. select {
  571. case <-ctx.Done():
  572. return ctx.Err()
  573. default:
  574. }
  575. var hashes IndirectionHashesOnly
  576. if err := hashes.Unmarshal(it.Value()); err != nil {
  577. return err
  578. }
  579. if len(hashes.BlocksHash) > 0 {
  580. blockFilter.Add(bloomHash(hashes.BlocksHash))
  581. }
  582. if len(hashes.VersionHash) > 0 {
  583. versionFilter.Add(bloomHash(hashes.VersionHash))
  584. }
  585. }
  586. it.Release()
  587. if err := it.Error(); err != nil {
  588. return err
  589. }
  590. // Iterate over block lists, removing keys with hashes that don't match
  591. // the filter.
  592. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  593. if err != nil {
  594. return err
  595. }
  596. defer it.Release()
  597. matchedBlocks := 0
  598. for it.Next() {
  599. select {
  600. case <-ctx.Done():
  601. return ctx.Err()
  602. default:
  603. }
  604. key := blockListKey(it.Key())
  605. if blockFilter.Has(bloomHash(key.Hash())) {
  606. matchedBlocks++
  607. continue
  608. }
  609. if err := t.Delete(key); err != nil {
  610. return err
  611. }
  612. }
  613. it.Release()
  614. if err := it.Error(); err != nil {
  615. return err
  616. }
  617. // Iterate over version lists, removing keys with hashes that don't match
  618. // the filter.
  619. it, err = db.NewPrefixIterator([]byte{KeyTypeVersion})
  620. if err != nil {
  621. return err
  622. }
  623. matchedVersions := 0
  624. for it.Next() {
  625. select {
  626. case <-ctx.Done():
  627. return ctx.Err()
  628. default:
  629. }
  630. key := versionKey(it.Key())
  631. if versionFilter.Has(bloomHash(key.Hash())) {
  632. matchedVersions++
  633. continue
  634. }
  635. if err := t.Delete(key); err != nil {
  636. return err
  637. }
  638. }
  639. it.Release()
  640. if err := it.Error(); err != nil {
  641. return err
  642. }
  643. // Remember the number of unique keys we kept until the next pass.
  644. db.gcKeyCount = matchedBlocks
  645. if matchedVersions > matchedBlocks {
  646. db.gcKeyCount = matchedVersions
  647. }
  648. if err := t.Commit(); err != nil {
  649. return err
  650. }
  651. return db.Compact()
  652. }
  653. func newBloomFilter(capacity int) *blobloom.Filter {
  654. return blobloom.NewOptimized(blobloom.Config{
  655. Capacity: uint64(capacity),
  656. FPRate: indirectGCBloomFalsePositiveRate,
  657. MaxBits: 8 * indirectGCBloomMaxBytes,
  658. })
  659. }
  660. // Hash function for the bloomfilter: first eight bytes of the SHA-256.
  661. // Big or little-endian makes no difference, as long as we're consistent.
  662. func bloomHash(key []byte) uint64 {
  663. if len(key) != sha256.Size {
  664. panic("bug: bloomHash passed something not a SHA256 hash")
  665. }
  666. return binary.BigEndian.Uint64(key)
  667. }
  668. // CheckRepair checks folder metadata and sequences for miscellaneous errors.
  669. func (db *Lowlevel) CheckRepair() {
  670. for _, folder := range db.ListFolders() {
  671. _ = db.getMetaAndCheck(folder)
  672. }
  673. }
  674. func (db *Lowlevel) getMetaAndCheck(folder string) *metadataTracker {
  675. db.gcMut.RLock()
  676. defer db.gcMut.RUnlock()
  677. meta, err := db.recalcMeta(folder)
  678. if err == nil {
  679. var fixed int
  680. fixed, err = db.repairSequenceGCLocked(folder, meta)
  681. if fixed != 0 {
  682. l.Infof("Repaired %d sequence entries in database", fixed)
  683. }
  684. }
  685. if backend.IsClosed(err) {
  686. return nil
  687. } else if err != nil {
  688. panic(err)
  689. }
  690. return meta
  691. }
  692. func (db *Lowlevel) loadMetadataTracker(folder string) *metadataTracker {
  693. meta := newMetadataTracker()
  694. if err := meta.fromDB(db, []byte(folder)); err != nil {
  695. l.Infof("No stored folder metadata for %q; recalculating", folder)
  696. return db.getMetaAndCheck(folder)
  697. }
  698. curSeq := meta.Sequence(protocol.LocalDeviceID)
  699. if metaOK := db.verifyLocalSequence(curSeq, folder); !metaOK {
  700. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  701. return db.getMetaAndCheck(folder)
  702. }
  703. if age := time.Since(meta.Created()); age > db.recheckInterval {
  704. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, age)
  705. return db.getMetaAndCheck(folder)
  706. }
  707. return meta
  708. }
  709. func (db *Lowlevel) recalcMeta(folder string) (*metadataTracker, error) {
  710. meta := newMetadataTracker()
  711. if err := db.checkGlobals([]byte(folder), meta); err != nil {
  712. return nil, err
  713. }
  714. t, err := db.newReadWriteTransaction()
  715. if err != nil {
  716. return nil, err
  717. }
  718. defer t.close()
  719. var deviceID protocol.DeviceID
  720. err = t.withAllFolderTruncated([]byte(folder), func(device []byte, f FileInfoTruncated) bool {
  721. copy(deviceID[:], device)
  722. meta.addFile(deviceID, f)
  723. return true
  724. })
  725. if err != nil {
  726. return nil, err
  727. }
  728. meta.emptyNeeded(protocol.LocalDeviceID)
  729. err = t.withNeed([]byte(folder), protocol.LocalDeviceID[:], true, func(f FileIntf) bool {
  730. meta.addNeeded(protocol.LocalDeviceID, f)
  731. return true
  732. })
  733. if err != nil {
  734. return nil, err
  735. }
  736. for _, device := range meta.devices() {
  737. meta.emptyNeeded(device)
  738. err = t.withNeed([]byte(folder), device[:], true, func(f FileIntf) bool {
  739. meta.addNeeded(device, f)
  740. return true
  741. })
  742. if err != nil {
  743. return nil, err
  744. }
  745. }
  746. meta.SetCreated()
  747. if err := meta.toDB(t, []byte(folder)); err != nil {
  748. return nil, err
  749. }
  750. if err := t.Commit(); err != nil {
  751. return nil, err
  752. }
  753. return meta, nil
  754. }
  755. // Verify the local sequence number from actual sequence entries. Returns
  756. // true if it was all good, or false if a fixup was necessary.
  757. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) bool {
  758. // Walk the sequence index from the current (supposedly) highest
  759. // sequence number and raise the alarm if we get anything. This recovers
  760. // from the occasion where we have written sequence entries to disk but
  761. // not yet written new metadata to disk.
  762. //
  763. // Note that we can have the same thing happen for remote devices but
  764. // there it's not a problem -- we'll simply advertise a lower sequence
  765. // number than we've actually seen and receive some duplicate updates
  766. // and then be in sync again.
  767. t, err := db.newReadOnlyTransaction()
  768. if err != nil {
  769. panic(err)
  770. }
  771. ok := true
  772. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi FileIntf) bool {
  773. ok = false // we got something, which we should not have
  774. return false
  775. }); err != nil && !backend.IsClosed(err) {
  776. panic(err)
  777. }
  778. t.close()
  779. return ok
  780. }
  781. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  782. // match those in the corresponding file entries. It returns the amount of fixed
  783. // entries.
  784. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  785. t, err := db.newReadWriteTransaction()
  786. if err != nil {
  787. return 0, err
  788. }
  789. defer t.close()
  790. fixed := 0
  791. folder := []byte(folderStr)
  792. // First check that every file entry has a matching sequence entry
  793. // (this was previously db schema upgrade to 9).
  794. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  795. if err != nil {
  796. return 0, err
  797. }
  798. it, err := t.NewPrefixIterator(dk.WithoutName())
  799. if err != nil {
  800. return 0, err
  801. }
  802. defer it.Release()
  803. var sk sequenceKey
  804. for it.Next() {
  805. intf, err := t.unmarshalTrunc(it.Value(), true)
  806. if err != nil {
  807. return 0, err
  808. }
  809. fi := intf.(FileInfoTruncated)
  810. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  811. return 0, err
  812. }
  813. switch dk, err = t.Get(sk); {
  814. case err != nil:
  815. if !backend.IsNotFound(err) {
  816. return 0, err
  817. }
  818. fallthrough
  819. case !bytes.Equal(it.Key(), dk):
  820. fixed++
  821. fi.Sequence = meta.nextLocalSeq()
  822. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  823. return 0, err
  824. }
  825. if err := t.Put(sk, it.Key()); err != nil {
  826. return 0, err
  827. }
  828. if err := t.putFile(it.Key(), fi.copyToFileInfo(), true); err != nil {
  829. return 0, err
  830. }
  831. }
  832. if err := t.Checkpoint(func() error {
  833. return meta.toDB(t, folder)
  834. }); err != nil {
  835. return 0, err
  836. }
  837. }
  838. if err := it.Error(); err != nil {
  839. return 0, err
  840. }
  841. it.Release()
  842. // Secondly check there's no sequence entries pointing at incorrect things.
  843. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  844. if err != nil {
  845. return 0, err
  846. }
  847. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  848. if err != nil {
  849. return 0, err
  850. }
  851. defer it.Release()
  852. for it.Next() {
  853. // Check that the sequence from the key matches the
  854. // sequence in the file.
  855. fi, ok, err := t.getFileTrunc(it.Value(), true)
  856. if err != nil {
  857. return 0, err
  858. }
  859. if ok {
  860. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  861. continue
  862. }
  863. }
  864. // Either the file is missing or has a different sequence number
  865. fixed++
  866. if err := t.Delete(it.Key()); err != nil {
  867. return 0, err
  868. }
  869. }
  870. if err := it.Error(); err != nil {
  871. return 0, err
  872. }
  873. it.Release()
  874. if err := meta.toDB(t, folder); err != nil {
  875. return 0, err
  876. }
  877. return fixed, t.Commit()
  878. }
  879. // unchanged checks if two files are the same and thus don't need to be updated.
  880. // Local flags or the invalid bit might change without the version
  881. // being bumped.
  882. func unchanged(nf, ef FileIntf) bool {
  883. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  884. }