lowlevel.go 22 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "time"
  12. "github.com/syncthing/syncthing/lib/db/backend"
  13. "github.com/syncthing/syncthing/lib/protocol"
  14. "github.com/syncthing/syncthing/lib/sync"
  15. "github.com/syncthing/syncthing/lib/util"
  16. "github.com/thejerf/suture"
  17. "github.com/willf/bloom"
  18. )
  19. const (
  20. // We set the bloom filter capacity to handle 100k individual items with
  21. // a false positive probability of 1% for the first pass. Once we know
  22. // how many items we have we will use that number instead, if it's more
  23. // than 100k. For fewer than 100k items we will just get better false
  24. // positive rate instead.
  25. indirectGCBloomCapacity = 100000
  26. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  27. indirectGCDefaultInterval = 13 * time.Hour
  28. indirectGCTimeKey = "lastIndirectGCTime"
  29. // Use indirection for the block list when it exceeds this many entries
  30. blocksIndirectionCutoff = 3
  31. recheckDefaultInterval = 30 * 24 * time.Hour
  32. )
  33. // Lowlevel is the lowest level database interface. It has a very simple
  34. // purpose: hold the actual backend database, and the in-memory state
  35. // that belong to that database. In the same way that a single on disk
  36. // database can only be opened once, there should be only one Lowlevel for
  37. // any given backend.
  38. type Lowlevel struct {
  39. *suture.Supervisor
  40. backend.Backend
  41. folderIdx *smallIndex
  42. deviceIdx *smallIndex
  43. keyer keyer
  44. gcMut sync.RWMutex
  45. gcKeyCount int
  46. indirectGCInterval time.Duration
  47. recheckInterval time.Duration
  48. }
  49. func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
  50. db := &Lowlevel{
  51. Supervisor: suture.New("db.Lowlevel", suture.Spec{
  52. // Only log restarts in debug mode.
  53. Log: func(line string) {
  54. l.Debugln(line)
  55. },
  56. PassThroughPanics: true,
  57. }),
  58. Backend: backend,
  59. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  60. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  61. gcMut: sync.NewRWMutex(),
  62. indirectGCInterval: indirectGCDefaultInterval,
  63. recheckInterval: recheckDefaultInterval,
  64. }
  65. for _, opt := range opts {
  66. opt(db)
  67. }
  68. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  69. db.Add(util.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  70. return db
  71. }
  72. type Option func(*Lowlevel)
  73. // WithRecheckInterval sets the time interval in between metadata recalculations
  74. // and consistency checks.
  75. func WithRecheckInterval(dur time.Duration) Option {
  76. return func(db *Lowlevel) {
  77. if dur > 0 {
  78. db.recheckInterval = dur
  79. }
  80. }
  81. }
  82. // WithIndirectGCInterval sets the time interval in between GC runs.
  83. func WithIndirectGCInterval(dur time.Duration) Option {
  84. return func(db *Lowlevel) {
  85. if dur > 0 {
  86. db.indirectGCInterval = dur
  87. }
  88. }
  89. }
  90. // ListFolders returns the list of folders currently in the database
  91. func (db *Lowlevel) ListFolders() []string {
  92. return db.folderIdx.Values()
  93. }
  94. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  95. // global versionlist and metadata.
  96. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  97. db.gcMut.RLock()
  98. defer db.gcMut.RUnlock()
  99. t, err := db.newReadWriteTransaction()
  100. if err != nil {
  101. return err
  102. }
  103. defer t.close()
  104. var dk, gk, keyBuf []byte
  105. devID := protocol.DeviceIDFromBytes(device)
  106. for _, f := range fs {
  107. name := []byte(f.Name)
  108. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  109. if err != nil {
  110. return err
  111. }
  112. ef, ok, err := t.getFileTrunc(dk, true)
  113. if err != nil {
  114. return err
  115. }
  116. if ok && unchanged(f, ef) {
  117. continue
  118. }
  119. if ok {
  120. meta.removeFile(devID, ef)
  121. }
  122. meta.addFile(devID, f)
  123. l.Debugf("insert; folder=%q device=%v %v", folder, devID, f)
  124. if err := t.putFile(dk, f, false); err != nil {
  125. return err
  126. }
  127. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  128. if err != nil {
  129. return err
  130. }
  131. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  132. if err != nil {
  133. return err
  134. }
  135. if err := t.Checkpoint(func() error {
  136. return meta.toDB(t, folder)
  137. }); err != nil {
  138. return err
  139. }
  140. }
  141. if err := meta.toDB(t, folder); err != nil {
  142. return err
  143. }
  144. return t.Commit()
  145. }
  146. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  147. // metadata, sequence and blockmap buckets.
  148. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  149. db.gcMut.RLock()
  150. defer db.gcMut.RUnlock()
  151. t, err := db.newReadWriteTransaction()
  152. if err != nil {
  153. return err
  154. }
  155. defer t.close()
  156. var dk, gk, keyBuf []byte
  157. blockBuf := make([]byte, 4)
  158. for _, f := range fs {
  159. name := []byte(f.Name)
  160. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  161. if err != nil {
  162. return err
  163. }
  164. ef, ok, err := t.getFileByKey(dk)
  165. if err != nil {
  166. return err
  167. }
  168. if ok && unchanged(f, ef) {
  169. continue
  170. }
  171. if ok {
  172. if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() {
  173. for _, block := range ef.Blocks {
  174. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  175. if err != nil {
  176. return err
  177. }
  178. if err := t.Delete(keyBuf); err != nil {
  179. return err
  180. }
  181. }
  182. }
  183. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  184. if err != nil {
  185. return err
  186. }
  187. if err := t.Delete(keyBuf); err != nil {
  188. return err
  189. }
  190. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  191. }
  192. f.Sequence = meta.nextLocalSeq()
  193. if ok {
  194. meta.removeFile(protocol.LocalDeviceID, ef)
  195. }
  196. meta.addFile(protocol.LocalDeviceID, f)
  197. l.Debugf("insert (local); folder=%q %v", folder, f)
  198. if err := t.putFile(dk, f, false); err != nil {
  199. return err
  200. }
  201. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  202. if err != nil {
  203. return err
  204. }
  205. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  206. if err != nil {
  207. return err
  208. }
  209. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  210. if err != nil {
  211. return err
  212. }
  213. if err := t.Put(keyBuf, dk); err != nil {
  214. return err
  215. }
  216. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  217. if !f.IsDirectory() && !f.IsDeleted() && !f.IsInvalid() {
  218. for i, block := range f.Blocks {
  219. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  220. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  221. if err != nil {
  222. return err
  223. }
  224. if err := t.Put(keyBuf, blockBuf); err != nil {
  225. return err
  226. }
  227. }
  228. }
  229. if err := t.Checkpoint(func() error {
  230. return meta.toDB(t, folder)
  231. }); err != nil {
  232. return err
  233. }
  234. }
  235. if err := meta.toDB(t, folder); err != nil {
  236. return err
  237. }
  238. return t.Commit()
  239. }
  240. func (db *Lowlevel) dropFolder(folder []byte) error {
  241. db.gcMut.RLock()
  242. defer db.gcMut.RUnlock()
  243. t, err := db.newReadWriteTransaction()
  244. if err != nil {
  245. return err
  246. }
  247. defer t.close()
  248. // Remove all items related to the given folder from the device->file bucket
  249. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  250. if err != nil {
  251. return err
  252. }
  253. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  254. return err
  255. }
  256. // Remove all sequences related to the folder
  257. k1, err := db.keyer.GenerateSequenceKey(nil, folder, 0)
  258. if err != nil {
  259. return err
  260. }
  261. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  262. return err
  263. }
  264. // Remove all items related to the given folder from the global bucket
  265. k2, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  266. if err != nil {
  267. return err
  268. }
  269. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  270. return err
  271. }
  272. // Remove all needs related to the folder
  273. k3, err := db.keyer.GenerateNeedFileKey(nil, folder, nil)
  274. if err != nil {
  275. return err
  276. }
  277. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  278. return err
  279. }
  280. // Remove the blockmap of the folder
  281. k4, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  282. if err != nil {
  283. return err
  284. }
  285. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  286. return err
  287. }
  288. return t.Commit()
  289. }
  290. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  291. db.gcMut.RLock()
  292. defer db.gcMut.RUnlock()
  293. t, err := db.newReadWriteTransaction()
  294. if err != nil {
  295. return err
  296. }
  297. defer t.close()
  298. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  299. if err != nil {
  300. return err
  301. }
  302. dbi, err := t.NewPrefixIterator(key)
  303. if err != nil {
  304. return err
  305. }
  306. var gk, keyBuf []byte
  307. for dbi.Next() {
  308. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  309. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  310. if err != nil {
  311. return err
  312. }
  313. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  314. if err != nil {
  315. return err
  316. }
  317. if err := t.Delete(dbi.Key()); err != nil {
  318. return err
  319. }
  320. if err := t.Checkpoint(); err != nil {
  321. return err
  322. }
  323. }
  324. if err := dbi.Error(); err != nil {
  325. return err
  326. }
  327. dbi.Release()
  328. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  329. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  330. if err != nil {
  331. return err
  332. }
  333. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  334. return err
  335. }
  336. }
  337. return t.Commit()
  338. }
  339. func (db *Lowlevel) checkGlobals(folder []byte, meta *metadataTracker) error {
  340. t, err := db.newReadWriteTransaction()
  341. if err != nil {
  342. return err
  343. }
  344. defer t.close()
  345. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  346. if err != nil {
  347. return err
  348. }
  349. dbi, err := t.NewPrefixIterator(key.WithoutName())
  350. if err != nil {
  351. return err
  352. }
  353. defer dbi.Release()
  354. var dk []byte
  355. for dbi.Next() {
  356. var vl VersionList
  357. if err := vl.Unmarshal(dbi.Value()); err != nil || len(vl.Versions) == 0 {
  358. if err := t.Delete(dbi.Key()); err != nil {
  359. return err
  360. }
  361. continue
  362. }
  363. // Check the global version list for consistency. An issue in previous
  364. // versions of goleveldb could result in reordered writes so that
  365. // there are global entries pointing to no longer existing files. Here
  366. // we find those and clear them out.
  367. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  368. var newVL VersionList
  369. for i, version := range vl.Versions {
  370. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, version.Device, name)
  371. if err != nil {
  372. return err
  373. }
  374. _, err := t.Get(dk)
  375. if backend.IsNotFound(err) {
  376. continue
  377. }
  378. if err != nil {
  379. return err
  380. }
  381. newVL.Versions = append(newVL.Versions, version)
  382. if i == 0 {
  383. if fi, ok, err := t.getFileTrunc(dk, true); err != nil {
  384. return err
  385. } else if ok {
  386. meta.addFile(protocol.GlobalDeviceID, fi)
  387. }
  388. }
  389. }
  390. if newLen := len(newVL.Versions); newLen == 0 {
  391. if err := t.Delete(dbi.Key()); err != nil {
  392. return err
  393. }
  394. } else if newLen != len(vl.Versions) {
  395. if err := t.Put(dbi.Key(), mustMarshal(&newVL)); err != nil {
  396. return err
  397. }
  398. }
  399. }
  400. if err := dbi.Error(); err != nil {
  401. return err
  402. }
  403. l.Debugf("db check completed for %q", folder)
  404. return t.Commit()
  405. }
  406. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  407. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  408. if err != nil {
  409. return 0, err
  410. }
  411. cur, err := db.Get(key)
  412. if backend.IsNotFound(err) {
  413. return 0, nil
  414. } else if err != nil {
  415. return 0, err
  416. }
  417. var id protocol.IndexID
  418. if err := id.Unmarshal(cur); err != nil {
  419. return 0, nil
  420. }
  421. return id, nil
  422. }
  423. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  424. bs, _ := id.Marshal() // marshalling can't fail
  425. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  426. if err != nil {
  427. return err
  428. }
  429. return db.Put(key, bs)
  430. }
  431. func (db *Lowlevel) dropMtimes(folder []byte) error {
  432. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  433. if err != nil {
  434. return err
  435. }
  436. return db.dropPrefix(key)
  437. }
  438. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  439. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  440. if err != nil {
  441. return err
  442. }
  443. return db.dropPrefix(key)
  444. }
  445. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  446. t, err := db.newReadWriteTransaction()
  447. if err != nil {
  448. return err
  449. }
  450. defer t.close()
  451. if err := t.deleteKeyPrefix(prefix); err != nil {
  452. return err
  453. }
  454. return t.Commit()
  455. }
  456. func (db *Lowlevel) gcRunner(ctx context.Context) {
  457. // Calculate the time for the next GC run. Even if we should run GC
  458. // directly, give the system a while to get up and running and do other
  459. // stuff first. (We might have migrations and stuff which would be
  460. // better off running before GC.)
  461. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  462. if next < time.Minute {
  463. next = time.Minute
  464. }
  465. t := time.NewTimer(next)
  466. defer t.Stop()
  467. for {
  468. select {
  469. case <-ctx.Done():
  470. return
  471. case <-t.C:
  472. if err := db.gcIndirect(ctx); err != nil {
  473. l.Warnln("Database indirection GC failed:", err)
  474. }
  475. db.recordTime(indirectGCTimeKey)
  476. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  477. }
  478. }
  479. }
  480. // recordTime records the current time under the given key, affecting the
  481. // next call to timeUntil with the same key.
  482. func (db *Lowlevel) recordTime(key string) {
  483. miscDB := NewMiscDataNamespace(db)
  484. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  485. }
  486. // timeUntil returns how long we should wait until the next interval, or
  487. // zero if it should happen directly.
  488. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  489. miscDB := NewMiscDataNamespace(db)
  490. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  491. nextTime := time.Unix(lastTime, 0).Add(every)
  492. sleepTime := time.Until(nextTime)
  493. if sleepTime < 0 {
  494. sleepTime = 0
  495. }
  496. return sleepTime
  497. }
  498. func (db *Lowlevel) gcIndirect(ctx context.Context) error {
  499. // The indirection GC uses bloom filters to track used block lists and
  500. // versions. This means iterating over all items, adding their hashes to
  501. // the filter, then iterating over the indirected items and removing
  502. // those that don't match the filter. The filter will give false
  503. // positives so we will keep around one percent of things that we don't
  504. // really need (at most).
  505. //
  506. // Indirection GC needs to run when there are no modifications to the
  507. // FileInfos or indirected items.
  508. db.gcMut.Lock()
  509. defer db.gcMut.Unlock()
  510. t, err := db.newReadWriteTransaction()
  511. if err != nil {
  512. return err
  513. }
  514. defer t.Release()
  515. // Set up the bloom filters with the initial capacity and false positive
  516. // rate, or higher capacity if we've done this before and seen lots of
  517. // items. For simplicity's sake we track just one count, which is the
  518. // highest of the various indirected items.
  519. capacity := indirectGCBloomCapacity
  520. if db.gcKeyCount > capacity {
  521. capacity = db.gcKeyCount
  522. }
  523. blockFilter := bloom.NewWithEstimates(uint(capacity), indirectGCBloomFalsePositiveRate)
  524. // Iterate the FileInfos, unmarshal the block and version hashes and
  525. // add them to the filter.
  526. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  527. if err != nil {
  528. return err
  529. }
  530. defer it.Release()
  531. for it.Next() {
  532. select {
  533. case <-ctx.Done():
  534. return ctx.Err()
  535. default:
  536. }
  537. var bl BlocksHashOnly
  538. if err := bl.Unmarshal(it.Value()); err != nil {
  539. return err
  540. }
  541. if len(bl.BlocksHash) > 0 {
  542. blockFilter.Add(bl.BlocksHash)
  543. }
  544. }
  545. it.Release()
  546. if err := it.Error(); err != nil {
  547. return err
  548. }
  549. // Iterate over block lists, removing keys with hashes that don't match
  550. // the filter.
  551. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  552. if err != nil {
  553. return err
  554. }
  555. defer it.Release()
  556. matchedBlocks := 0
  557. for it.Next() {
  558. select {
  559. case <-ctx.Done():
  560. return ctx.Err()
  561. default:
  562. }
  563. key := blockListKey(it.Key())
  564. if blockFilter.Test(key.BlocksHash()) {
  565. matchedBlocks++
  566. continue
  567. }
  568. if err := t.Delete(key); err != nil {
  569. return err
  570. }
  571. }
  572. it.Release()
  573. if err := it.Error(); err != nil {
  574. return err
  575. }
  576. // Remember the number of unique keys we kept until the next pass.
  577. db.gcKeyCount = matchedBlocks
  578. if err := t.Commit(); err != nil {
  579. return err
  580. }
  581. return db.Compact()
  582. }
  583. // CheckRepair checks folder metadata and sequences for miscellaneous errors.
  584. func (db *Lowlevel) CheckRepair() {
  585. for _, folder := range db.ListFolders() {
  586. _ = db.getMetaAndCheck(folder)
  587. }
  588. }
  589. func (db *Lowlevel) getMetaAndCheck(folder string) *metadataTracker {
  590. db.gcMut.RLock()
  591. defer db.gcMut.RUnlock()
  592. meta, err := db.recalcMeta(folder)
  593. if err == nil {
  594. var fixed int
  595. fixed, err = db.repairSequenceGCLocked(folder, meta)
  596. if fixed != 0 {
  597. l.Infof("Repaired %d sequence entries in database", fixed)
  598. }
  599. }
  600. if backend.IsClosed(err) {
  601. return nil
  602. } else if err != nil {
  603. panic(err)
  604. }
  605. return meta
  606. }
  607. func (db *Lowlevel) loadMetadataTracker(folder string) *metadataTracker {
  608. meta := newMetadataTracker()
  609. if err := meta.fromDB(db, []byte(folder)); err != nil {
  610. l.Infof("No stored folder metadata for %q; recalculating", folder)
  611. return db.getMetaAndCheck(folder)
  612. }
  613. curSeq := meta.Sequence(protocol.LocalDeviceID)
  614. if metaOK := db.verifyLocalSequence(curSeq, folder); !metaOK {
  615. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  616. return db.getMetaAndCheck(folder)
  617. }
  618. if age := time.Since(meta.Created()); age > db.recheckInterval {
  619. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, age)
  620. return db.getMetaAndCheck(folder)
  621. }
  622. return meta
  623. }
  624. func (db *Lowlevel) recalcMeta(folder string) (*metadataTracker, error) {
  625. meta := newMetadataTracker()
  626. if err := db.checkGlobals([]byte(folder), meta); err != nil {
  627. return nil, err
  628. }
  629. t, err := db.newReadWriteTransaction()
  630. if err != nil {
  631. return nil, err
  632. }
  633. defer t.close()
  634. var deviceID protocol.DeviceID
  635. err = t.withAllFolderTruncated([]byte(folder), func(device []byte, f FileInfoTruncated) bool {
  636. copy(deviceID[:], device)
  637. meta.addFile(deviceID, f)
  638. return true
  639. })
  640. if err != nil {
  641. return nil, err
  642. }
  643. meta.SetCreated()
  644. if err := meta.toDB(t, []byte(folder)); err != nil {
  645. return nil, err
  646. }
  647. if err := t.Commit(); err != nil {
  648. return nil, err
  649. }
  650. return meta, nil
  651. }
  652. // Verify the local sequence number from actual sequence entries. Returns
  653. // true if it was all good, or false if a fixup was necessary.
  654. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) bool {
  655. // Walk the sequence index from the current (supposedly) highest
  656. // sequence number and raise the alarm if we get anything. This recovers
  657. // from the occasion where we have written sequence entries to disk but
  658. // not yet written new metadata to disk.
  659. //
  660. // Note that we can have the same thing happen for remote devices but
  661. // there it's not a problem -- we'll simply advertise a lower sequence
  662. // number than we've actually seen and receive some duplicate updates
  663. // and then be in sync again.
  664. t, err := db.newReadOnlyTransaction()
  665. if err != nil {
  666. panic(err)
  667. }
  668. ok := true
  669. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi FileIntf) bool {
  670. ok = false // we got something, which we should not have
  671. return false
  672. }); err != nil && !backend.IsClosed(err) {
  673. panic(err)
  674. }
  675. t.close()
  676. return ok
  677. }
  678. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  679. // match those in the corresponding file entries. It returns the amount of fixed
  680. // entries.
  681. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  682. t, err := db.newReadWriteTransaction()
  683. if err != nil {
  684. return 0, err
  685. }
  686. defer t.close()
  687. fixed := 0
  688. folder := []byte(folderStr)
  689. // First check that every file entry has a matching sequence entry
  690. // (this was previously db schema upgrade to 9).
  691. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  692. if err != nil {
  693. return 0, err
  694. }
  695. it, err := t.NewPrefixIterator(dk.WithoutName())
  696. if err != nil {
  697. return 0, err
  698. }
  699. defer it.Release()
  700. var sk sequenceKey
  701. for it.Next() {
  702. intf, err := t.unmarshalTrunc(it.Value(), true)
  703. if err != nil {
  704. return 0, err
  705. }
  706. fi := intf.(FileInfoTruncated)
  707. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  708. return 0, err
  709. }
  710. switch dk, err = t.Get(sk); {
  711. case err != nil:
  712. if !backend.IsNotFound(err) {
  713. return 0, err
  714. }
  715. fallthrough
  716. case !bytes.Equal(it.Key(), dk):
  717. fixed++
  718. fi.Sequence = meta.nextLocalSeq()
  719. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  720. return 0, err
  721. }
  722. if err := t.Put(sk, it.Key()); err != nil {
  723. return 0, err
  724. }
  725. if err := t.putFile(it.Key(), fi.copyToFileInfo(), true); err != nil {
  726. return 0, err
  727. }
  728. }
  729. if err := t.Checkpoint(func() error {
  730. return meta.toDB(t, folder)
  731. }); err != nil {
  732. return 0, err
  733. }
  734. }
  735. if err := it.Error(); err != nil {
  736. return 0, err
  737. }
  738. it.Release()
  739. // Secondly check there's no sequence entries pointing at incorrect things.
  740. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  741. if err != nil {
  742. return 0, err
  743. }
  744. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  745. if err != nil {
  746. return 0, err
  747. }
  748. defer it.Release()
  749. for it.Next() {
  750. // Check that the sequence from the key matches the
  751. // sequence in the file.
  752. fi, ok, err := t.getFileTrunc(it.Value(), true)
  753. if err != nil {
  754. return 0, err
  755. }
  756. if ok {
  757. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  758. continue
  759. }
  760. }
  761. // Either the file is missing or has a different sequence number
  762. fixed++
  763. if err := t.Delete(it.Key()); err != nil {
  764. return 0, err
  765. }
  766. }
  767. if err := it.Error(); err != nil {
  768. return 0, err
  769. }
  770. it.Release()
  771. if err := meta.toDB(t, folder); err != nil {
  772. return 0, err
  773. }
  774. return fixed, t.Commit()
  775. }
  776. // unchanged checks if two files are the same and thus don't need to be updated.
  777. // Local flags or the invalid bit might change without the version
  778. // being bumped.
  779. func unchanged(nf, ef FileIntf) bool {
  780. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  781. }