lowlevel.go 26 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "time"
  12. "github.com/greatroar/blobloom"
  13. "github.com/syncthing/syncthing/lib/db/backend"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. "github.com/syncthing/syncthing/lib/sha256"
  16. "github.com/syncthing/syncthing/lib/sync"
  17. "github.com/syncthing/syncthing/lib/util"
  18. "github.com/thejerf/suture"
  19. )
  20. const (
  21. // We set the bloom filter capacity to handle 100k individual items with
  22. // a false positive probability of 1% for the first pass. Once we know
  23. // how many items we have we will use that number instead, if it's more
  24. // than 100k. For fewer than 100k items we will just get better false
  25. // positive rate instead.
  26. indirectGCBloomCapacity = 100000
  27. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  28. indirectGCBloomMaxBytes = 32 << 20 // Use at most 32MiB memory, which covers our desired FP rate at 27 M items
  29. indirectGCDefaultInterval = 13 * time.Hour
  30. indirectGCTimeKey = "lastIndirectGCTime"
  31. // Use indirection for the block list when it exceeds this many entries
  32. blocksIndirectionCutoff = 3
  33. // Use indirection for the version vector when it exceeds this many entries
  34. versionIndirectionCutoff = 10
  35. recheckDefaultInterval = 30 * 24 * time.Hour
  36. )
  37. // Lowlevel is the lowest level database interface. It has a very simple
  38. // purpose: hold the actual backend database, and the in-memory state
  39. // that belong to that database. In the same way that a single on disk
  40. // database can only be opened once, there should be only one Lowlevel for
  41. // any given backend.
  42. type Lowlevel struct {
  43. *suture.Supervisor
  44. backend.Backend
  45. folderIdx *smallIndex
  46. deviceIdx *smallIndex
  47. keyer keyer
  48. gcMut sync.RWMutex
  49. gcKeyCount int
  50. indirectGCInterval time.Duration
  51. recheckInterval time.Duration
  52. }
  53. func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
  54. db := &Lowlevel{
  55. Supervisor: suture.New("db.Lowlevel", suture.Spec{
  56. // Only log restarts in debug mode.
  57. Log: func(line string) {
  58. l.Debugln(line)
  59. },
  60. PassThroughPanics: true,
  61. }),
  62. Backend: backend,
  63. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  64. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  65. gcMut: sync.NewRWMutex(),
  66. indirectGCInterval: indirectGCDefaultInterval,
  67. recheckInterval: recheckDefaultInterval,
  68. }
  69. for _, opt := range opts {
  70. opt(db)
  71. }
  72. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  73. db.Add(util.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  74. return db
  75. }
  76. type Option func(*Lowlevel)
  77. // WithRecheckInterval sets the time interval in between metadata recalculations
  78. // and consistency checks.
  79. func WithRecheckInterval(dur time.Duration) Option {
  80. return func(db *Lowlevel) {
  81. if dur > 0 {
  82. db.recheckInterval = dur
  83. }
  84. }
  85. }
  86. // WithIndirectGCInterval sets the time interval in between GC runs.
  87. func WithIndirectGCInterval(dur time.Duration) Option {
  88. return func(db *Lowlevel) {
  89. if dur > 0 {
  90. db.indirectGCInterval = dur
  91. }
  92. }
  93. }
  94. // ListFolders returns the list of folders currently in the database
  95. func (db *Lowlevel) ListFolders() []string {
  96. return db.folderIdx.Values()
  97. }
  98. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  99. // global versionlist and metadata.
  100. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  101. db.gcMut.RLock()
  102. defer db.gcMut.RUnlock()
  103. t, err := db.newReadWriteTransaction()
  104. if err != nil {
  105. return err
  106. }
  107. defer t.close()
  108. var dk, gk, keyBuf []byte
  109. devID, err := protocol.DeviceIDFromBytes(device)
  110. if err != nil {
  111. return err
  112. }
  113. for _, f := range fs {
  114. name := []byte(f.Name)
  115. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  116. if err != nil {
  117. return err
  118. }
  119. ef, ok, err := t.getFileTrunc(dk, true)
  120. if err != nil {
  121. return err
  122. }
  123. if ok && unchanged(f, ef) {
  124. continue
  125. }
  126. if ok {
  127. meta.removeFile(devID, ef)
  128. }
  129. meta.addFile(devID, f)
  130. l.Debugf("insert; folder=%q device=%v %v", folder, devID, f)
  131. if err := t.putFile(dk, f, false); err != nil {
  132. return err
  133. }
  134. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  135. if err != nil {
  136. return err
  137. }
  138. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  139. if err != nil {
  140. return err
  141. }
  142. if err := t.Checkpoint(func() error {
  143. return meta.toDB(t, folder)
  144. }); err != nil {
  145. return err
  146. }
  147. }
  148. if err := meta.toDB(t, folder); err != nil {
  149. return err
  150. }
  151. return t.Commit()
  152. }
  153. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  154. // metadata, sequence and blockmap buckets.
  155. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  156. db.gcMut.RLock()
  157. defer db.gcMut.RUnlock()
  158. t, err := db.newReadWriteTransaction()
  159. if err != nil {
  160. return err
  161. }
  162. defer t.close()
  163. var dk, gk, keyBuf []byte
  164. blockBuf := make([]byte, 4)
  165. for _, f := range fs {
  166. name := []byte(f.Name)
  167. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  168. if err != nil {
  169. return err
  170. }
  171. ef, ok, err := t.getFileByKey(dk)
  172. if err != nil {
  173. return err
  174. }
  175. if ok && unchanged(f, ef) {
  176. continue
  177. }
  178. blocksHashSame := ok && bytes.Equal(ef.BlocksHash, f.BlocksHash)
  179. if ok {
  180. if len(ef.Blocks) != 0 && !ef.IsInvalid() && ef.Size > 0 {
  181. for _, block := range ef.Blocks {
  182. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  183. if err != nil {
  184. return err
  185. }
  186. if err := t.Delete(keyBuf); err != nil {
  187. return err
  188. }
  189. }
  190. if !blocksHashSame {
  191. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, ef.BlocksHash, name)
  192. if err != nil {
  193. return err
  194. }
  195. if err = t.Delete(keyBuf); err != nil {
  196. return err
  197. }
  198. }
  199. }
  200. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  201. if err != nil {
  202. return err
  203. }
  204. if err := t.Delete(keyBuf); err != nil {
  205. return err
  206. }
  207. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  208. }
  209. f.Sequence = meta.nextLocalSeq()
  210. if ok {
  211. meta.removeFile(protocol.LocalDeviceID, ef)
  212. }
  213. meta.addFile(protocol.LocalDeviceID, f)
  214. l.Debugf("insert (local); folder=%q %v", folder, f)
  215. if err := t.putFile(dk, f, false); err != nil {
  216. return err
  217. }
  218. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  219. if err != nil {
  220. return err
  221. }
  222. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  223. if err != nil {
  224. return err
  225. }
  226. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  227. if err != nil {
  228. return err
  229. }
  230. if err := t.Put(keyBuf, dk); err != nil {
  231. return err
  232. }
  233. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  234. if len(f.Blocks) != 0 && !f.IsInvalid() && f.Size > 0 {
  235. for i, block := range f.Blocks {
  236. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  237. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  238. if err != nil {
  239. return err
  240. }
  241. if err := t.Put(keyBuf, blockBuf); err != nil {
  242. return err
  243. }
  244. }
  245. if !blocksHashSame {
  246. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, f.BlocksHash, name)
  247. if err != nil {
  248. return err
  249. }
  250. if err = t.Put(keyBuf, nil); err != nil {
  251. return err
  252. }
  253. }
  254. }
  255. if err := t.Checkpoint(func() error {
  256. return meta.toDB(t, folder)
  257. }); err != nil {
  258. return err
  259. }
  260. }
  261. if err := meta.toDB(t, folder); err != nil {
  262. return err
  263. }
  264. return t.Commit()
  265. }
  266. func (db *Lowlevel) dropFolder(folder []byte) error {
  267. db.gcMut.RLock()
  268. defer db.gcMut.RUnlock()
  269. t, err := db.newReadWriteTransaction()
  270. if err != nil {
  271. return err
  272. }
  273. defer t.close()
  274. // Remove all items related to the given folder from the device->file bucket
  275. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  276. if err != nil {
  277. return err
  278. }
  279. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  280. return err
  281. }
  282. // Remove all sequences related to the folder
  283. k1, err := db.keyer.GenerateSequenceKey(k0, folder, 0)
  284. if err != nil {
  285. return err
  286. }
  287. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  288. return err
  289. }
  290. // Remove all items related to the given folder from the global bucket
  291. k2, err := db.keyer.GenerateGlobalVersionKey(k1, folder, nil)
  292. if err != nil {
  293. return err
  294. }
  295. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  296. return err
  297. }
  298. // Remove all needs related to the folder
  299. k3, err := db.keyer.GenerateNeedFileKey(k2, folder, nil)
  300. if err != nil {
  301. return err
  302. }
  303. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  304. return err
  305. }
  306. // Remove the blockmap of the folder
  307. k4, err := db.keyer.GenerateBlockMapKey(k3, folder, nil, nil)
  308. if err != nil {
  309. return err
  310. }
  311. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  312. return err
  313. }
  314. k5, err := db.keyer.GenerateBlockListMapKey(k4, folder, nil, nil)
  315. if err != nil {
  316. return err
  317. }
  318. if err := t.deleteKeyPrefix(k5.WithoutHashAndName()); err != nil {
  319. return err
  320. }
  321. return t.Commit()
  322. }
  323. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  324. db.gcMut.RLock()
  325. defer db.gcMut.RUnlock()
  326. t, err := db.newReadWriteTransaction()
  327. if err != nil {
  328. return err
  329. }
  330. defer t.close()
  331. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  332. if err != nil {
  333. return err
  334. }
  335. dbi, err := t.NewPrefixIterator(key)
  336. if err != nil {
  337. return err
  338. }
  339. defer dbi.Release()
  340. var gk, keyBuf []byte
  341. for dbi.Next() {
  342. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  343. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  344. if err != nil {
  345. return err
  346. }
  347. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  348. if err != nil {
  349. return err
  350. }
  351. if err := t.Delete(dbi.Key()); err != nil {
  352. return err
  353. }
  354. if err := t.Checkpoint(); err != nil {
  355. return err
  356. }
  357. }
  358. dbi.Release()
  359. if err := dbi.Error(); err != nil {
  360. return err
  361. }
  362. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  363. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  364. if err != nil {
  365. return err
  366. }
  367. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  368. return err
  369. }
  370. key2, err := db.keyer.GenerateBlockListMapKey(key, folder, nil, nil)
  371. if err != nil {
  372. return err
  373. }
  374. if err := t.deleteKeyPrefix(key2.WithoutHashAndName()); err != nil {
  375. return err
  376. }
  377. }
  378. return t.Commit()
  379. }
  380. func (db *Lowlevel) checkGlobals(folder []byte) error {
  381. t, err := db.newReadWriteTransaction()
  382. if err != nil {
  383. return err
  384. }
  385. defer t.close()
  386. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  387. if err != nil {
  388. return err
  389. }
  390. dbi, err := t.NewPrefixIterator(key.WithoutName())
  391. if err != nil {
  392. return err
  393. }
  394. defer dbi.Release()
  395. var dk []byte
  396. ro := t.readOnlyTransaction
  397. for dbi.Next() {
  398. var vl VersionList
  399. if err := vl.Unmarshal(dbi.Value()); err != nil || vl.Empty() {
  400. if err := t.Delete(dbi.Key()); err != nil {
  401. return err
  402. }
  403. continue
  404. }
  405. // Check the global version list for consistency. An issue in previous
  406. // versions of goleveldb could result in reordered writes so that
  407. // there are global entries pointing to no longer existing files. Here
  408. // we find those and clear them out.
  409. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  410. newVL := &VersionList{}
  411. var changed, changedHere bool
  412. for _, fv := range vl.RawVersions {
  413. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.Devices, newVL, ro)
  414. if err != nil {
  415. return err
  416. }
  417. changed = changed || changedHere
  418. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.InvalidDevices, newVL, ro)
  419. if err != nil {
  420. return err
  421. }
  422. changed = changed || changedHere
  423. }
  424. if newVL.Empty() {
  425. if err := t.Delete(dbi.Key()); err != nil {
  426. return err
  427. }
  428. } else if changed {
  429. if err := t.Put(dbi.Key(), mustMarshal(newVL)); err != nil {
  430. return err
  431. }
  432. }
  433. }
  434. dbi.Release()
  435. if err := dbi.Error(); err != nil {
  436. return err
  437. }
  438. l.Debugf("db check completed for %q", folder)
  439. return t.Commit()
  440. }
  441. func checkGlobalsFilterDevices(dk, folder, name []byte, devices [][]byte, vl *VersionList, t readOnlyTransaction) (bool, error) {
  442. var changed bool
  443. var err error
  444. for _, device := range devices {
  445. dk, err = t.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  446. if err != nil {
  447. return false, err
  448. }
  449. f, ok, err := t.getFileTrunc(dk, true)
  450. if err != nil {
  451. return false, err
  452. }
  453. if !ok {
  454. changed = true
  455. continue
  456. }
  457. _, _, _, _, _, _, err = vl.update(folder, device, f, t)
  458. if err != nil {
  459. return false, err
  460. }
  461. }
  462. return changed, nil
  463. }
  464. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  465. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  466. if err != nil {
  467. return 0, err
  468. }
  469. cur, err := db.Get(key)
  470. if backend.IsNotFound(err) {
  471. return 0, nil
  472. } else if err != nil {
  473. return 0, err
  474. }
  475. var id protocol.IndexID
  476. if err := id.Unmarshal(cur); err != nil {
  477. return 0, nil
  478. }
  479. return id, nil
  480. }
  481. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  482. bs, _ := id.Marshal() // marshalling can't fail
  483. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  484. if err != nil {
  485. return err
  486. }
  487. return db.Put(key, bs)
  488. }
  489. func (db *Lowlevel) dropMtimes(folder []byte) error {
  490. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  491. if err != nil {
  492. return err
  493. }
  494. return db.dropPrefix(key)
  495. }
  496. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  497. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  498. if err != nil {
  499. return err
  500. }
  501. return db.dropPrefix(key)
  502. }
  503. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  504. t, err := db.newReadWriteTransaction()
  505. if err != nil {
  506. return err
  507. }
  508. defer t.close()
  509. if err := t.deleteKeyPrefix(prefix); err != nil {
  510. return err
  511. }
  512. return t.Commit()
  513. }
  514. func (db *Lowlevel) gcRunner(ctx context.Context) {
  515. // Calculate the time for the next GC run. Even if we should run GC
  516. // directly, give the system a while to get up and running and do other
  517. // stuff first. (We might have migrations and stuff which would be
  518. // better off running before GC.)
  519. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  520. if next < time.Minute {
  521. next = time.Minute
  522. }
  523. t := time.NewTimer(next)
  524. defer t.Stop()
  525. for {
  526. select {
  527. case <-ctx.Done():
  528. return
  529. case <-t.C:
  530. if err := db.gcIndirect(ctx); err != nil {
  531. l.Warnln("Database indirection GC failed:", err)
  532. }
  533. db.recordTime(indirectGCTimeKey)
  534. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  535. }
  536. }
  537. }
  538. // recordTime records the current time under the given key, affecting the
  539. // next call to timeUntil with the same key.
  540. func (db *Lowlevel) recordTime(key string) {
  541. miscDB := NewMiscDataNamespace(db)
  542. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  543. }
  544. // timeUntil returns how long we should wait until the next interval, or
  545. // zero if it should happen directly.
  546. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  547. miscDB := NewMiscDataNamespace(db)
  548. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  549. nextTime := time.Unix(lastTime, 0).Add(every)
  550. sleepTime := time.Until(nextTime)
  551. if sleepTime < 0 {
  552. sleepTime = 0
  553. }
  554. return sleepTime
  555. }
  556. func (db *Lowlevel) gcIndirect(ctx context.Context) error {
  557. // The indirection GC uses bloom filters to track used block lists and
  558. // versions. This means iterating over all items, adding their hashes to
  559. // the filter, then iterating over the indirected items and removing
  560. // those that don't match the filter. The filter will give false
  561. // positives so we will keep around one percent of things that we don't
  562. // really need (at most).
  563. //
  564. // Indirection GC needs to run when there are no modifications to the
  565. // FileInfos or indirected items.
  566. db.gcMut.Lock()
  567. defer db.gcMut.Unlock()
  568. t, err := db.newReadWriteTransaction()
  569. if err != nil {
  570. return err
  571. }
  572. defer t.Release()
  573. // Set up the bloom filters with the initial capacity and false positive
  574. // rate, or higher capacity if we've done this before and seen lots of
  575. // items. For simplicity's sake we track just one count, which is the
  576. // highest of the various indirected items.
  577. capacity := indirectGCBloomCapacity
  578. if db.gcKeyCount > capacity {
  579. capacity = db.gcKeyCount
  580. }
  581. blockFilter := newBloomFilter(capacity)
  582. versionFilter := newBloomFilter(capacity)
  583. // Iterate the FileInfos, unmarshal the block and version hashes and
  584. // add them to the filter.
  585. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  586. if err != nil {
  587. return err
  588. }
  589. defer it.Release()
  590. for it.Next() {
  591. select {
  592. case <-ctx.Done():
  593. return ctx.Err()
  594. default:
  595. }
  596. var hashes IndirectionHashesOnly
  597. if err := hashes.Unmarshal(it.Value()); err != nil {
  598. return err
  599. }
  600. if len(hashes.BlocksHash) > 0 {
  601. blockFilter.Add(bloomHash(hashes.BlocksHash))
  602. }
  603. if len(hashes.VersionHash) > 0 {
  604. versionFilter.Add(bloomHash(hashes.VersionHash))
  605. }
  606. }
  607. it.Release()
  608. if err := it.Error(); err != nil {
  609. return err
  610. }
  611. // Iterate over block lists, removing keys with hashes that don't match
  612. // the filter.
  613. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  614. if err != nil {
  615. return err
  616. }
  617. defer it.Release()
  618. matchedBlocks := 0
  619. for it.Next() {
  620. select {
  621. case <-ctx.Done():
  622. return ctx.Err()
  623. default:
  624. }
  625. key := blockListKey(it.Key())
  626. if blockFilter.Has(bloomHash(key.Hash())) {
  627. matchedBlocks++
  628. continue
  629. }
  630. if err := t.Delete(key); err != nil {
  631. return err
  632. }
  633. }
  634. it.Release()
  635. if err := it.Error(); err != nil {
  636. return err
  637. }
  638. // Iterate over version lists, removing keys with hashes that don't match
  639. // the filter.
  640. it, err = db.NewPrefixIterator([]byte{KeyTypeVersion})
  641. if err != nil {
  642. return err
  643. }
  644. matchedVersions := 0
  645. for it.Next() {
  646. select {
  647. case <-ctx.Done():
  648. return ctx.Err()
  649. default:
  650. }
  651. key := versionKey(it.Key())
  652. if versionFilter.Has(bloomHash(key.Hash())) {
  653. matchedVersions++
  654. continue
  655. }
  656. if err := t.Delete(key); err != nil {
  657. return err
  658. }
  659. }
  660. it.Release()
  661. if err := it.Error(); err != nil {
  662. return err
  663. }
  664. // Remember the number of unique keys we kept until the next pass.
  665. db.gcKeyCount = matchedBlocks
  666. if matchedVersions > matchedBlocks {
  667. db.gcKeyCount = matchedVersions
  668. }
  669. if err := t.Commit(); err != nil {
  670. return err
  671. }
  672. return db.Compact()
  673. }
  674. func newBloomFilter(capacity int) *blobloom.Filter {
  675. return blobloom.NewOptimized(blobloom.Config{
  676. Capacity: uint64(capacity),
  677. FPRate: indirectGCBloomFalsePositiveRate,
  678. MaxBits: 8 * indirectGCBloomMaxBytes,
  679. })
  680. }
  681. // Hash function for the bloomfilter: first eight bytes of the SHA-256.
  682. // Big or little-endian makes no difference, as long as we're consistent.
  683. func bloomHash(key []byte) uint64 {
  684. if len(key) != sha256.Size {
  685. panic("bug: bloomHash passed something not a SHA256 hash")
  686. }
  687. return binary.BigEndian.Uint64(key)
  688. }
  689. // CheckRepair checks folder metadata and sequences for miscellaneous errors.
  690. func (db *Lowlevel) CheckRepair() {
  691. for _, folder := range db.ListFolders() {
  692. _ = db.getMetaAndCheck(folder)
  693. }
  694. }
  695. func (db *Lowlevel) getMetaAndCheck(folder string) *metadataTracker {
  696. db.gcMut.RLock()
  697. defer db.gcMut.RUnlock()
  698. meta, err := db.recalcMeta(folder)
  699. if err == nil {
  700. var fixed int
  701. fixed, err = db.repairSequenceGCLocked(folder, meta)
  702. if fixed != 0 {
  703. l.Infof("Repaired %d sequence entries in database", fixed)
  704. }
  705. }
  706. if backend.IsClosed(err) {
  707. return nil
  708. } else if err != nil {
  709. panic(err)
  710. }
  711. return meta
  712. }
  713. func (db *Lowlevel) loadMetadataTracker(folder string) *metadataTracker {
  714. meta := newMetadataTracker()
  715. if err := meta.fromDB(db, []byte(folder)); err != nil {
  716. l.Infof("No stored folder metadata for %q; recalculating", folder)
  717. return db.getMetaAndCheck(folder)
  718. }
  719. curSeq := meta.Sequence(protocol.LocalDeviceID)
  720. if metaOK := db.verifyLocalSequence(curSeq, folder); !metaOK {
  721. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  722. return db.getMetaAndCheck(folder)
  723. }
  724. if age := time.Since(meta.Created()); age > db.recheckInterval {
  725. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, age)
  726. return db.getMetaAndCheck(folder)
  727. }
  728. return meta
  729. }
  730. func (db *Lowlevel) recalcMeta(folder string) (*metadataTracker, error) {
  731. meta := newMetadataTracker()
  732. if err := db.checkGlobals([]byte(folder)); err != nil {
  733. return nil, err
  734. }
  735. t, err := db.newReadWriteTransaction()
  736. if err != nil {
  737. return nil, err
  738. }
  739. defer t.close()
  740. var deviceID protocol.DeviceID
  741. err = t.withAllFolderTruncated([]byte(folder), func(device []byte, f FileInfoTruncated) bool {
  742. copy(deviceID[:], device)
  743. meta.addFile(deviceID, f)
  744. return true
  745. })
  746. if err != nil {
  747. return nil, err
  748. }
  749. err = t.withGlobal([]byte(folder), nil, true, func(f protocol.FileIntf) bool {
  750. meta.addFile(protocol.GlobalDeviceID, f)
  751. return true
  752. })
  753. meta.emptyNeeded(protocol.LocalDeviceID)
  754. err = t.withNeed([]byte(folder), protocol.LocalDeviceID[:], true, func(f protocol.FileIntf) bool {
  755. meta.addNeeded(protocol.LocalDeviceID, f)
  756. return true
  757. })
  758. if err != nil {
  759. return nil, err
  760. }
  761. for _, device := range meta.devices() {
  762. meta.emptyNeeded(device)
  763. err = t.withNeed([]byte(folder), device[:], true, func(f protocol.FileIntf) bool {
  764. meta.addNeeded(device, f)
  765. return true
  766. })
  767. if err != nil {
  768. return nil, err
  769. }
  770. }
  771. meta.SetCreated()
  772. if err := meta.toDB(t, []byte(folder)); err != nil {
  773. return nil, err
  774. }
  775. if err := t.Commit(); err != nil {
  776. return nil, err
  777. }
  778. return meta, nil
  779. }
  780. // Verify the local sequence number from actual sequence entries. Returns
  781. // true if it was all good, or false if a fixup was necessary.
  782. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) bool {
  783. // Walk the sequence index from the current (supposedly) highest
  784. // sequence number and raise the alarm if we get anything. This recovers
  785. // from the occasion where we have written sequence entries to disk but
  786. // not yet written new metadata to disk.
  787. //
  788. // Note that we can have the same thing happen for remote devices but
  789. // there it's not a problem -- we'll simply advertise a lower sequence
  790. // number than we've actually seen and receive some duplicate updates
  791. // and then be in sync again.
  792. t, err := db.newReadOnlyTransaction()
  793. if err != nil {
  794. panic(err)
  795. }
  796. ok := true
  797. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi protocol.FileIntf) bool {
  798. ok = false // we got something, which we should not have
  799. return false
  800. }); err != nil && !backend.IsClosed(err) {
  801. panic(err)
  802. }
  803. t.close()
  804. return ok
  805. }
  806. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  807. // match those in the corresponding file entries. It returns the amount of fixed
  808. // entries.
  809. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  810. t, err := db.newReadWriteTransaction()
  811. if err != nil {
  812. return 0, err
  813. }
  814. defer t.close()
  815. fixed := 0
  816. folder := []byte(folderStr)
  817. // First check that every file entry has a matching sequence entry
  818. // (this was previously db schema upgrade to 9).
  819. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  820. if err != nil {
  821. return 0, err
  822. }
  823. it, err := t.NewPrefixIterator(dk.WithoutName())
  824. if err != nil {
  825. return 0, err
  826. }
  827. defer it.Release()
  828. var sk sequenceKey
  829. for it.Next() {
  830. intf, err := t.unmarshalTrunc(it.Value(), true)
  831. if err != nil {
  832. return 0, err
  833. }
  834. fi := intf.(FileInfoTruncated)
  835. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  836. return 0, err
  837. }
  838. switch dk, err = t.Get(sk); {
  839. case err != nil:
  840. if !backend.IsNotFound(err) {
  841. return 0, err
  842. }
  843. fallthrough
  844. case !bytes.Equal(it.Key(), dk):
  845. fixed++
  846. fi.Sequence = meta.nextLocalSeq()
  847. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  848. return 0, err
  849. }
  850. if err := t.Put(sk, it.Key()); err != nil {
  851. return 0, err
  852. }
  853. if err := t.putFile(it.Key(), fi.copyToFileInfo(), true); err != nil {
  854. return 0, err
  855. }
  856. }
  857. if err := t.Checkpoint(func() error {
  858. return meta.toDB(t, folder)
  859. }); err != nil {
  860. return 0, err
  861. }
  862. }
  863. if err := it.Error(); err != nil {
  864. return 0, err
  865. }
  866. it.Release()
  867. // Secondly check there's no sequence entries pointing at incorrect things.
  868. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  869. if err != nil {
  870. return 0, err
  871. }
  872. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  873. if err != nil {
  874. return 0, err
  875. }
  876. defer it.Release()
  877. for it.Next() {
  878. // Check that the sequence from the key matches the
  879. // sequence in the file.
  880. fi, ok, err := t.getFileTrunc(it.Value(), true)
  881. if err != nil {
  882. return 0, err
  883. }
  884. if ok {
  885. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  886. continue
  887. }
  888. }
  889. // Either the file is missing or has a different sequence number
  890. fixed++
  891. if err := t.Delete(it.Key()); err != nil {
  892. return 0, err
  893. }
  894. }
  895. if err := it.Error(); err != nil {
  896. return 0, err
  897. }
  898. it.Release()
  899. if err := meta.toDB(t, folder); err != nil {
  900. return 0, err
  901. }
  902. return fixed, t.Commit()
  903. }
  904. // unchanged checks if two files are the same and thus don't need to be updated.
  905. // Local flags or the invalid bit might change without the version
  906. // being bumped.
  907. func unchanged(nf, ef protocol.FileIntf) bool {
  908. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  909. }