lowlevel.go 22 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "time"
  12. "github.com/greatroar/blobloom"
  13. "github.com/syncthing/syncthing/lib/db/backend"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. "github.com/syncthing/syncthing/lib/sync"
  16. "github.com/syncthing/syncthing/lib/util"
  17. "github.com/thejerf/suture"
  18. )
  19. const (
  20. // We set the bloom filter capacity to handle 100k individual items with
  21. // a false positive probability of 1% for the first pass. Once we know
  22. // how many items we have we will use that number instead, if it's more
  23. // than 100k. For fewer than 100k items we will just get better false
  24. // positive rate instead.
  25. indirectGCBloomCapacity = 100000
  26. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  27. indirectGCBloomMaxBytes = 32 << 20 // Use at most 32MiB memory, which covers our desired FP rate at 27 M items
  28. indirectGCDefaultInterval = 13 * time.Hour
  29. indirectGCTimeKey = "lastIndirectGCTime"
  30. // Use indirection for the block list when it exceeds this many entries
  31. blocksIndirectionCutoff = 3
  32. recheckDefaultInterval = 30 * 24 * time.Hour
  33. )
  34. // Lowlevel is the lowest level database interface. It has a very simple
  35. // purpose: hold the actual backend database, and the in-memory state
  36. // that belong to that database. In the same way that a single on disk
  37. // database can only be opened once, there should be only one Lowlevel for
  38. // any given backend.
  39. type Lowlevel struct {
  40. *suture.Supervisor
  41. backend.Backend
  42. folderIdx *smallIndex
  43. deviceIdx *smallIndex
  44. keyer keyer
  45. gcMut sync.RWMutex
  46. gcKeyCount int
  47. indirectGCInterval time.Duration
  48. recheckInterval time.Duration
  49. }
  50. func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
  51. db := &Lowlevel{
  52. Supervisor: suture.New("db.Lowlevel", suture.Spec{
  53. // Only log restarts in debug mode.
  54. Log: func(line string) {
  55. l.Debugln(line)
  56. },
  57. PassThroughPanics: true,
  58. }),
  59. Backend: backend,
  60. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  61. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  62. gcMut: sync.NewRWMutex(),
  63. indirectGCInterval: indirectGCDefaultInterval,
  64. recheckInterval: recheckDefaultInterval,
  65. }
  66. for _, opt := range opts {
  67. opt(db)
  68. }
  69. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  70. db.Add(util.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  71. return db
  72. }
  73. type Option func(*Lowlevel)
  74. // WithRecheckInterval sets the time interval in between metadata recalculations
  75. // and consistency checks.
  76. func WithRecheckInterval(dur time.Duration) Option {
  77. return func(db *Lowlevel) {
  78. if dur > 0 {
  79. db.recheckInterval = dur
  80. }
  81. }
  82. }
  83. // WithIndirectGCInterval sets the time interval in between GC runs.
  84. func WithIndirectGCInterval(dur time.Duration) Option {
  85. return func(db *Lowlevel) {
  86. if dur > 0 {
  87. db.indirectGCInterval = dur
  88. }
  89. }
  90. }
  91. // ListFolders returns the list of folders currently in the database
  92. func (db *Lowlevel) ListFolders() []string {
  93. return db.folderIdx.Values()
  94. }
  95. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  96. // global versionlist and metadata.
  97. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  98. db.gcMut.RLock()
  99. defer db.gcMut.RUnlock()
  100. t, err := db.newReadWriteTransaction()
  101. if err != nil {
  102. return err
  103. }
  104. defer t.close()
  105. var dk, gk, keyBuf []byte
  106. devID := protocol.DeviceIDFromBytes(device)
  107. for _, f := range fs {
  108. name := []byte(f.Name)
  109. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  110. if err != nil {
  111. return err
  112. }
  113. ef, ok, err := t.getFileTrunc(dk, true)
  114. if err != nil {
  115. return err
  116. }
  117. if ok && unchanged(f, ef) {
  118. continue
  119. }
  120. if ok {
  121. meta.removeFile(devID, ef)
  122. }
  123. meta.addFile(devID, f)
  124. l.Debugf("insert; folder=%q device=%v %v", folder, devID, f)
  125. if err := t.putFile(dk, f, false); err != nil {
  126. return err
  127. }
  128. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  129. if err != nil {
  130. return err
  131. }
  132. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  133. if err != nil {
  134. return err
  135. }
  136. if err := t.Checkpoint(func() error {
  137. return meta.toDB(t, folder)
  138. }); err != nil {
  139. return err
  140. }
  141. }
  142. if err := meta.toDB(t, folder); err != nil {
  143. return err
  144. }
  145. return t.Commit()
  146. }
  147. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  148. // metadata, sequence and blockmap buckets.
  149. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  150. db.gcMut.RLock()
  151. defer db.gcMut.RUnlock()
  152. t, err := db.newReadWriteTransaction()
  153. if err != nil {
  154. return err
  155. }
  156. defer t.close()
  157. var dk, gk, keyBuf []byte
  158. blockBuf := make([]byte, 4)
  159. for _, f := range fs {
  160. name := []byte(f.Name)
  161. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  162. if err != nil {
  163. return err
  164. }
  165. ef, ok, err := t.getFileByKey(dk)
  166. if err != nil {
  167. return err
  168. }
  169. if ok && unchanged(f, ef) {
  170. continue
  171. }
  172. if ok {
  173. if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() {
  174. for _, block := range ef.Blocks {
  175. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  176. if err != nil {
  177. return err
  178. }
  179. if err := t.Delete(keyBuf); err != nil {
  180. return err
  181. }
  182. }
  183. }
  184. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  185. if err != nil {
  186. return err
  187. }
  188. if err := t.Delete(keyBuf); err != nil {
  189. return err
  190. }
  191. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  192. }
  193. f.Sequence = meta.nextLocalSeq()
  194. if ok {
  195. meta.removeFile(protocol.LocalDeviceID, ef)
  196. }
  197. meta.addFile(protocol.LocalDeviceID, f)
  198. l.Debugf("insert (local); folder=%q %v", folder, f)
  199. if err := t.putFile(dk, f, false); err != nil {
  200. return err
  201. }
  202. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  203. if err != nil {
  204. return err
  205. }
  206. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  207. if err != nil {
  208. return err
  209. }
  210. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  211. if err != nil {
  212. return err
  213. }
  214. if err := t.Put(keyBuf, dk); err != nil {
  215. return err
  216. }
  217. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  218. if !f.IsDirectory() && !f.IsDeleted() && !f.IsInvalid() {
  219. for i, block := range f.Blocks {
  220. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  221. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  222. if err != nil {
  223. return err
  224. }
  225. if err := t.Put(keyBuf, blockBuf); err != nil {
  226. return err
  227. }
  228. }
  229. }
  230. if err := t.Checkpoint(func() error {
  231. return meta.toDB(t, folder)
  232. }); err != nil {
  233. return err
  234. }
  235. }
  236. if err := meta.toDB(t, folder); err != nil {
  237. return err
  238. }
  239. return t.Commit()
  240. }
  241. func (db *Lowlevel) dropFolder(folder []byte) error {
  242. db.gcMut.RLock()
  243. defer db.gcMut.RUnlock()
  244. t, err := db.newReadWriteTransaction()
  245. if err != nil {
  246. return err
  247. }
  248. defer t.close()
  249. // Remove all items related to the given folder from the device->file bucket
  250. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  251. if err != nil {
  252. return err
  253. }
  254. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  255. return err
  256. }
  257. // Remove all sequences related to the folder
  258. k1, err := db.keyer.GenerateSequenceKey(nil, folder, 0)
  259. if err != nil {
  260. return err
  261. }
  262. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  263. return err
  264. }
  265. // Remove all items related to the given folder from the global bucket
  266. k2, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  267. if err != nil {
  268. return err
  269. }
  270. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  271. return err
  272. }
  273. // Remove all needs related to the folder
  274. k3, err := db.keyer.GenerateNeedFileKey(nil, folder, nil)
  275. if err != nil {
  276. return err
  277. }
  278. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  279. return err
  280. }
  281. // Remove the blockmap of the folder
  282. k4, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  283. if err != nil {
  284. return err
  285. }
  286. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  287. return err
  288. }
  289. return t.Commit()
  290. }
  291. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  292. db.gcMut.RLock()
  293. defer db.gcMut.RUnlock()
  294. t, err := db.newReadWriteTransaction()
  295. if err != nil {
  296. return err
  297. }
  298. defer t.close()
  299. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  300. if err != nil {
  301. return err
  302. }
  303. dbi, err := t.NewPrefixIterator(key)
  304. if err != nil {
  305. return err
  306. }
  307. var gk, keyBuf []byte
  308. for dbi.Next() {
  309. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  310. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  311. if err != nil {
  312. return err
  313. }
  314. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  315. if err != nil {
  316. return err
  317. }
  318. if err := t.Delete(dbi.Key()); err != nil {
  319. return err
  320. }
  321. if err := t.Checkpoint(); err != nil {
  322. return err
  323. }
  324. }
  325. if err := dbi.Error(); err != nil {
  326. return err
  327. }
  328. dbi.Release()
  329. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  330. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  331. if err != nil {
  332. return err
  333. }
  334. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  335. return err
  336. }
  337. }
  338. return t.Commit()
  339. }
  340. func (db *Lowlevel) checkGlobals(folder []byte, meta *metadataTracker) error {
  341. t, err := db.newReadWriteTransaction()
  342. if err != nil {
  343. return err
  344. }
  345. defer t.close()
  346. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  347. if err != nil {
  348. return err
  349. }
  350. dbi, err := t.NewPrefixIterator(key.WithoutName())
  351. if err != nil {
  352. return err
  353. }
  354. defer dbi.Release()
  355. var dk []byte
  356. for dbi.Next() {
  357. var vl VersionList
  358. if err := vl.Unmarshal(dbi.Value()); err != nil || len(vl.Versions) == 0 {
  359. if err := t.Delete(dbi.Key()); err != nil {
  360. return err
  361. }
  362. continue
  363. }
  364. // Check the global version list for consistency. An issue in previous
  365. // versions of goleveldb could result in reordered writes so that
  366. // there are global entries pointing to no longer existing files. Here
  367. // we find those and clear them out.
  368. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  369. var newVL VersionList
  370. for i, version := range vl.Versions {
  371. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, version.Device, name)
  372. if err != nil {
  373. return err
  374. }
  375. _, err := t.Get(dk)
  376. if backend.IsNotFound(err) {
  377. continue
  378. }
  379. if err != nil {
  380. return err
  381. }
  382. newVL.Versions = append(newVL.Versions, version)
  383. if i == 0 {
  384. if fi, ok, err := t.getFileTrunc(dk, true); err != nil {
  385. return err
  386. } else if ok {
  387. meta.addFile(protocol.GlobalDeviceID, fi)
  388. }
  389. }
  390. }
  391. if newLen := len(newVL.Versions); newLen == 0 {
  392. if err := t.Delete(dbi.Key()); err != nil {
  393. return err
  394. }
  395. } else if newLen != len(vl.Versions) {
  396. if err := t.Put(dbi.Key(), mustMarshal(&newVL)); err != nil {
  397. return err
  398. }
  399. }
  400. }
  401. if err := dbi.Error(); err != nil {
  402. return err
  403. }
  404. l.Debugf("db check completed for %q", folder)
  405. return t.Commit()
  406. }
  407. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  408. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  409. if err != nil {
  410. return 0, err
  411. }
  412. cur, err := db.Get(key)
  413. if backend.IsNotFound(err) {
  414. return 0, nil
  415. } else if err != nil {
  416. return 0, err
  417. }
  418. var id protocol.IndexID
  419. if err := id.Unmarshal(cur); err != nil {
  420. return 0, nil
  421. }
  422. return id, nil
  423. }
  424. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  425. bs, _ := id.Marshal() // marshalling can't fail
  426. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  427. if err != nil {
  428. return err
  429. }
  430. return db.Put(key, bs)
  431. }
  432. func (db *Lowlevel) dropMtimes(folder []byte) error {
  433. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  434. if err != nil {
  435. return err
  436. }
  437. return db.dropPrefix(key)
  438. }
  439. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  440. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  441. if err != nil {
  442. return err
  443. }
  444. return db.dropPrefix(key)
  445. }
  446. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  447. t, err := db.newReadWriteTransaction()
  448. if err != nil {
  449. return err
  450. }
  451. defer t.close()
  452. if err := t.deleteKeyPrefix(prefix); err != nil {
  453. return err
  454. }
  455. return t.Commit()
  456. }
  457. func (db *Lowlevel) gcRunner(ctx context.Context) {
  458. // Calculate the time for the next GC run. Even if we should run GC
  459. // directly, give the system a while to get up and running and do other
  460. // stuff first. (We might have migrations and stuff which would be
  461. // better off running before GC.)
  462. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  463. if next < time.Minute {
  464. next = time.Minute
  465. }
  466. t := time.NewTimer(next)
  467. defer t.Stop()
  468. for {
  469. select {
  470. case <-ctx.Done():
  471. return
  472. case <-t.C:
  473. if err := db.gcIndirect(ctx); err != nil {
  474. l.Warnln("Database indirection GC failed:", err)
  475. }
  476. db.recordTime(indirectGCTimeKey)
  477. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  478. }
  479. }
  480. }
  481. // recordTime records the current time under the given key, affecting the
  482. // next call to timeUntil with the same key.
  483. func (db *Lowlevel) recordTime(key string) {
  484. miscDB := NewMiscDataNamespace(db)
  485. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  486. }
  487. // timeUntil returns how long we should wait until the next interval, or
  488. // zero if it should happen directly.
  489. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  490. miscDB := NewMiscDataNamespace(db)
  491. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  492. nextTime := time.Unix(lastTime, 0).Add(every)
  493. sleepTime := time.Until(nextTime)
  494. if sleepTime < 0 {
  495. sleepTime = 0
  496. }
  497. return sleepTime
  498. }
  499. func (db *Lowlevel) gcIndirect(ctx context.Context) error {
  500. // The indirection GC uses bloom filters to track used block lists and
  501. // versions. This means iterating over all items, adding their hashes to
  502. // the filter, then iterating over the indirected items and removing
  503. // those that don't match the filter. The filter will give false
  504. // positives so we will keep around one percent of things that we don't
  505. // really need (at most).
  506. //
  507. // Indirection GC needs to run when there are no modifications to the
  508. // FileInfos or indirected items.
  509. db.gcMut.Lock()
  510. defer db.gcMut.Unlock()
  511. t, err := db.newReadWriteTransaction()
  512. if err != nil {
  513. return err
  514. }
  515. defer t.Release()
  516. // Set up the bloom filters with the initial capacity and false positive
  517. // rate, or higher capacity if we've done this before and seen lots of
  518. // items. For simplicity's sake we track just one count, which is the
  519. // highest of the various indirected items.
  520. capacity := indirectGCBloomCapacity
  521. if db.gcKeyCount > capacity {
  522. capacity = db.gcKeyCount
  523. }
  524. blockFilter := blobloom.NewOptimized(blobloom.Config{
  525. FPRate: indirectGCBloomFalsePositiveRate,
  526. MaxBits: 8 * indirectGCBloomMaxBytes,
  527. NKeys: uint64(capacity),
  528. })
  529. // Iterate the FileInfos, unmarshal the block and version hashes and
  530. // add them to the filter.
  531. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  532. if err != nil {
  533. return err
  534. }
  535. defer it.Release()
  536. for it.Next() {
  537. select {
  538. case <-ctx.Done():
  539. return ctx.Err()
  540. default:
  541. }
  542. var bl BlocksHashOnly
  543. if err := bl.Unmarshal(it.Value()); err != nil {
  544. return err
  545. }
  546. if len(bl.BlocksHash) > 0 {
  547. blockFilter.Add64(bloomHash(bl.BlocksHash))
  548. }
  549. }
  550. it.Release()
  551. if err := it.Error(); err != nil {
  552. return err
  553. }
  554. // Iterate over block lists, removing keys with hashes that don't match
  555. // the filter.
  556. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  557. if err != nil {
  558. return err
  559. }
  560. defer it.Release()
  561. matchedBlocks := 0
  562. for it.Next() {
  563. select {
  564. case <-ctx.Done():
  565. return ctx.Err()
  566. default:
  567. }
  568. key := blockListKey(it.Key())
  569. if blockFilter.Has64(bloomHash(key)) {
  570. matchedBlocks++
  571. continue
  572. }
  573. if err := t.Delete(key); err != nil {
  574. return err
  575. }
  576. }
  577. it.Release()
  578. if err := it.Error(); err != nil {
  579. return err
  580. }
  581. // Remember the number of unique keys we kept until the next pass.
  582. db.gcKeyCount = matchedBlocks
  583. if err := t.Commit(); err != nil {
  584. return err
  585. }
  586. return db.Compact()
  587. }
  588. // Hash function for the bloomfilter: first eight bytes of the SHA-256.
  589. // Big or little-endian makes no difference, as long as we're consistent.
  590. func bloomHash(key blockListKey) uint64 {
  591. return binary.BigEndian.Uint64(key.BlocksHash())
  592. }
  593. // CheckRepair checks folder metadata and sequences for miscellaneous errors.
  594. func (db *Lowlevel) CheckRepair() {
  595. for _, folder := range db.ListFolders() {
  596. _ = db.getMetaAndCheck(folder)
  597. }
  598. }
  599. func (db *Lowlevel) getMetaAndCheck(folder string) *metadataTracker {
  600. db.gcMut.RLock()
  601. defer db.gcMut.RUnlock()
  602. meta, err := db.recalcMeta(folder)
  603. if err == nil {
  604. var fixed int
  605. fixed, err = db.repairSequenceGCLocked(folder, meta)
  606. if fixed != 0 {
  607. l.Infof("Repaired %d sequence entries in database", fixed)
  608. }
  609. }
  610. if backend.IsClosed(err) {
  611. return nil
  612. } else if err != nil {
  613. panic(err)
  614. }
  615. return meta
  616. }
  617. func (db *Lowlevel) loadMetadataTracker(folder string) *metadataTracker {
  618. meta := newMetadataTracker()
  619. if err := meta.fromDB(db, []byte(folder)); err != nil {
  620. l.Infof("No stored folder metadata for %q; recalculating", folder)
  621. return db.getMetaAndCheck(folder)
  622. }
  623. curSeq := meta.Sequence(protocol.LocalDeviceID)
  624. if metaOK := db.verifyLocalSequence(curSeq, folder); !metaOK {
  625. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  626. return db.getMetaAndCheck(folder)
  627. }
  628. if age := time.Since(meta.Created()); age > db.recheckInterval {
  629. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, age)
  630. return db.getMetaAndCheck(folder)
  631. }
  632. return meta
  633. }
  634. func (db *Lowlevel) recalcMeta(folder string) (*metadataTracker, error) {
  635. meta := newMetadataTracker()
  636. if err := db.checkGlobals([]byte(folder), meta); err != nil {
  637. return nil, err
  638. }
  639. t, err := db.newReadWriteTransaction()
  640. if err != nil {
  641. return nil, err
  642. }
  643. defer t.close()
  644. var deviceID protocol.DeviceID
  645. err = t.withAllFolderTruncated([]byte(folder), func(device []byte, f FileInfoTruncated) bool {
  646. copy(deviceID[:], device)
  647. meta.addFile(deviceID, f)
  648. return true
  649. })
  650. if err != nil {
  651. return nil, err
  652. }
  653. meta.SetCreated()
  654. if err := meta.toDB(t, []byte(folder)); err != nil {
  655. return nil, err
  656. }
  657. if err := t.Commit(); err != nil {
  658. return nil, err
  659. }
  660. return meta, nil
  661. }
  662. // Verify the local sequence number from actual sequence entries. Returns
  663. // true if it was all good, or false if a fixup was necessary.
  664. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) bool {
  665. // Walk the sequence index from the current (supposedly) highest
  666. // sequence number and raise the alarm if we get anything. This recovers
  667. // from the occasion where we have written sequence entries to disk but
  668. // not yet written new metadata to disk.
  669. //
  670. // Note that we can have the same thing happen for remote devices but
  671. // there it's not a problem -- we'll simply advertise a lower sequence
  672. // number than we've actually seen and receive some duplicate updates
  673. // and then be in sync again.
  674. t, err := db.newReadOnlyTransaction()
  675. if err != nil {
  676. panic(err)
  677. }
  678. ok := true
  679. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi FileIntf) bool {
  680. ok = false // we got something, which we should not have
  681. return false
  682. }); err != nil && !backend.IsClosed(err) {
  683. panic(err)
  684. }
  685. t.close()
  686. return ok
  687. }
  688. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  689. // match those in the corresponding file entries. It returns the amount of fixed
  690. // entries.
  691. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  692. t, err := db.newReadWriteTransaction()
  693. if err != nil {
  694. return 0, err
  695. }
  696. defer t.close()
  697. fixed := 0
  698. folder := []byte(folderStr)
  699. // First check that every file entry has a matching sequence entry
  700. // (this was previously db schema upgrade to 9).
  701. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  702. if err != nil {
  703. return 0, err
  704. }
  705. it, err := t.NewPrefixIterator(dk.WithoutName())
  706. if err != nil {
  707. return 0, err
  708. }
  709. defer it.Release()
  710. var sk sequenceKey
  711. for it.Next() {
  712. intf, err := t.unmarshalTrunc(it.Value(), true)
  713. if err != nil {
  714. return 0, err
  715. }
  716. fi := intf.(FileInfoTruncated)
  717. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  718. return 0, err
  719. }
  720. switch dk, err = t.Get(sk); {
  721. case err != nil:
  722. if !backend.IsNotFound(err) {
  723. return 0, err
  724. }
  725. fallthrough
  726. case !bytes.Equal(it.Key(), dk):
  727. fixed++
  728. fi.Sequence = meta.nextLocalSeq()
  729. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  730. return 0, err
  731. }
  732. if err := t.Put(sk, it.Key()); err != nil {
  733. return 0, err
  734. }
  735. if err := t.putFile(it.Key(), fi.copyToFileInfo(), true); err != nil {
  736. return 0, err
  737. }
  738. }
  739. if err := t.Checkpoint(func() error {
  740. return meta.toDB(t, folder)
  741. }); err != nil {
  742. return 0, err
  743. }
  744. }
  745. if err := it.Error(); err != nil {
  746. return 0, err
  747. }
  748. it.Release()
  749. // Secondly check there's no sequence entries pointing at incorrect things.
  750. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  751. if err != nil {
  752. return 0, err
  753. }
  754. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  755. if err != nil {
  756. return 0, err
  757. }
  758. defer it.Release()
  759. for it.Next() {
  760. // Check that the sequence from the key matches the
  761. // sequence in the file.
  762. fi, ok, err := t.getFileTrunc(it.Value(), true)
  763. if err != nil {
  764. return 0, err
  765. }
  766. if ok {
  767. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  768. continue
  769. }
  770. }
  771. // Either the file is missing or has a different sequence number
  772. fixed++
  773. if err := t.Delete(it.Key()); err != nil {
  774. return 0, err
  775. }
  776. }
  777. if err := it.Error(); err != nil {
  778. return 0, err
  779. }
  780. it.Release()
  781. if err := meta.toDB(t, folder); err != nil {
  782. return 0, err
  783. }
  784. return fixed, t.Commit()
  785. }
  786. // unchanged checks if two files are the same and thus don't need to be updated.
  787. // Local flags or the invalid bit might change without the version
  788. // being bumped.
  789. func unchanged(nf, ef FileIntf) bool {
  790. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  791. }