lowlevel.go 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "io"
  12. "os"
  13. "time"
  14. "github.com/dchest/siphash"
  15. "github.com/greatroar/blobloom"
  16. "github.com/syncthing/syncthing/lib/db/backend"
  17. "github.com/syncthing/syncthing/lib/fs"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. "github.com/syncthing/syncthing/lib/rand"
  20. "github.com/syncthing/syncthing/lib/sha256"
  21. "github.com/syncthing/syncthing/lib/sync"
  22. "github.com/syncthing/syncthing/lib/util"
  23. "github.com/thejerf/suture"
  24. )
  25. const (
  26. // We set the bloom filter capacity to handle 100k individual items with
  27. // a false positive probability of 1% for the first pass. Once we know
  28. // how many items we have we will use that number instead, if it's more
  29. // than 100k. For fewer than 100k items we will just get better false
  30. // positive rate instead.
  31. indirectGCBloomCapacity = 100000
  32. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  33. indirectGCBloomMaxBytes = 32 << 20 // Use at most 32MiB memory, which covers our desired FP rate at 27 M items
  34. indirectGCDefaultInterval = 13 * time.Hour
  35. indirectGCTimeKey = "lastIndirectGCTime"
  36. // Use indirection for the block list when it exceeds this many entries
  37. blocksIndirectionCutoff = 3
  38. // Use indirection for the version vector when it exceeds this many entries
  39. versionIndirectionCutoff = 10
  40. recheckDefaultInterval = 30 * 24 * time.Hour
  41. needsRepairSuffix = ".needsrepair"
  42. )
  43. // Lowlevel is the lowest level database interface. It has a very simple
  44. // purpose: hold the actual backend database, and the in-memory state
  45. // that belong to that database. In the same way that a single on disk
  46. // database can only be opened once, there should be only one Lowlevel for
  47. // any given backend.
  48. type Lowlevel struct {
  49. *suture.Supervisor
  50. backend.Backend
  51. folderIdx *smallIndex
  52. deviceIdx *smallIndex
  53. keyer keyer
  54. gcMut sync.RWMutex
  55. gcKeyCount int
  56. indirectGCInterval time.Duration
  57. recheckInterval time.Duration
  58. }
  59. func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
  60. db := &Lowlevel{
  61. Supervisor: suture.New("db.Lowlevel", suture.Spec{
  62. // Only log restarts in debug mode.
  63. Log: func(line string) {
  64. l.Debugln(line)
  65. },
  66. PassThroughPanics: true,
  67. }),
  68. Backend: backend,
  69. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  70. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  71. gcMut: sync.NewRWMutex(),
  72. indirectGCInterval: indirectGCDefaultInterval,
  73. recheckInterval: recheckDefaultInterval,
  74. }
  75. for _, opt := range opts {
  76. opt(db)
  77. }
  78. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  79. db.Add(util.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  80. if path := db.needsRepairPath(); path != "" {
  81. if _, err := os.Lstat(path); err == nil {
  82. l.Infoln("Database was marked for repair - this may take a while")
  83. db.checkRepair()
  84. os.Remove(path)
  85. }
  86. }
  87. return db
  88. }
  89. type Option func(*Lowlevel)
  90. // WithRecheckInterval sets the time interval in between metadata recalculations
  91. // and consistency checks.
  92. func WithRecheckInterval(dur time.Duration) Option {
  93. return func(db *Lowlevel) {
  94. if dur > 0 {
  95. db.recheckInterval = dur
  96. }
  97. }
  98. }
  99. // WithIndirectGCInterval sets the time interval in between GC runs.
  100. func WithIndirectGCInterval(dur time.Duration) Option {
  101. return func(db *Lowlevel) {
  102. if dur > 0 {
  103. db.indirectGCInterval = dur
  104. }
  105. }
  106. }
  107. // ListFolders returns the list of folders currently in the database
  108. func (db *Lowlevel) ListFolders() []string {
  109. return db.folderIdx.Values()
  110. }
  111. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  112. // global versionlist and metadata.
  113. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  114. db.gcMut.RLock()
  115. defer db.gcMut.RUnlock()
  116. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  117. if err != nil {
  118. return err
  119. }
  120. defer t.close()
  121. var dk, gk, keyBuf []byte
  122. devID, err := protocol.DeviceIDFromBytes(device)
  123. if err != nil {
  124. return err
  125. }
  126. for _, f := range fs {
  127. name := []byte(f.Name)
  128. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  129. if err != nil {
  130. return err
  131. }
  132. ef, ok, err := t.getFileTrunc(dk, true)
  133. if err != nil {
  134. return err
  135. }
  136. if ok && unchanged(f, ef) {
  137. l.Debugf("not inserting unchanged (remote); folder=%q device=%v %v", folder, devID, f)
  138. continue
  139. }
  140. if ok {
  141. meta.removeFile(devID, ef)
  142. }
  143. meta.addFile(devID, f)
  144. l.Debugf("insert (remote); folder=%q device=%v %v", folder, devID, f)
  145. if err := t.putFile(dk, f); err != nil {
  146. return err
  147. }
  148. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  149. if err != nil {
  150. return err
  151. }
  152. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  153. if err != nil {
  154. return err
  155. }
  156. if err := t.Checkpoint(); err != nil {
  157. return err
  158. }
  159. }
  160. return t.Commit()
  161. }
  162. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  163. // metadata, sequence and blockmap buckets.
  164. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  165. db.gcMut.RLock()
  166. defer db.gcMut.RUnlock()
  167. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  168. if err != nil {
  169. return err
  170. }
  171. defer t.close()
  172. var dk, gk, keyBuf []byte
  173. blockBuf := make([]byte, 4)
  174. for _, f := range fs {
  175. name := []byte(f.Name)
  176. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  177. if err != nil {
  178. return err
  179. }
  180. ef, ok, err := t.getFileByKey(dk)
  181. if err != nil {
  182. return err
  183. }
  184. if ok && unchanged(f, ef) {
  185. l.Debugf("not inserting unchanged (local); folder=%q %v", folder, f)
  186. continue
  187. }
  188. blocksHashSame := ok && bytes.Equal(ef.BlocksHash, f.BlocksHash)
  189. if ok {
  190. if len(ef.Blocks) != 0 && !ef.IsInvalid() && ef.Size > 0 {
  191. for _, block := range ef.Blocks {
  192. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  193. if err != nil {
  194. return err
  195. }
  196. if err := t.Delete(keyBuf); err != nil {
  197. return err
  198. }
  199. }
  200. if !blocksHashSame {
  201. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, ef.BlocksHash, name)
  202. if err != nil {
  203. return err
  204. }
  205. if err = t.Delete(keyBuf); err != nil {
  206. return err
  207. }
  208. }
  209. }
  210. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  211. if err != nil {
  212. return err
  213. }
  214. if err := t.Delete(keyBuf); err != nil {
  215. return err
  216. }
  217. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  218. }
  219. f.Sequence = meta.nextLocalSeq()
  220. if ok {
  221. meta.removeFile(protocol.LocalDeviceID, ef)
  222. }
  223. meta.addFile(protocol.LocalDeviceID, f)
  224. l.Debugf("insert (local); folder=%q %v", folder, f)
  225. if err := t.putFile(dk, f); err != nil {
  226. return err
  227. }
  228. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  229. if err != nil {
  230. return err
  231. }
  232. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  233. if err != nil {
  234. return err
  235. }
  236. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  237. if err != nil {
  238. return err
  239. }
  240. if err := t.Put(keyBuf, dk); err != nil {
  241. return err
  242. }
  243. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  244. if len(f.Blocks) != 0 && !f.IsInvalid() && f.Size > 0 {
  245. for i, block := range f.Blocks {
  246. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  247. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  248. if err != nil {
  249. return err
  250. }
  251. if err := t.Put(keyBuf, blockBuf); err != nil {
  252. return err
  253. }
  254. }
  255. if !blocksHashSame {
  256. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, f.BlocksHash, name)
  257. if err != nil {
  258. return err
  259. }
  260. if err = t.Put(keyBuf, nil); err != nil {
  261. return err
  262. }
  263. }
  264. }
  265. if err := t.Checkpoint(); err != nil {
  266. return err
  267. }
  268. }
  269. return t.Commit()
  270. }
  271. func (db *Lowlevel) dropFolder(folder []byte) error {
  272. db.gcMut.RLock()
  273. defer db.gcMut.RUnlock()
  274. t, err := db.newReadWriteTransaction()
  275. if err != nil {
  276. return err
  277. }
  278. defer t.close()
  279. // Remove all items related to the given folder from the device->file bucket
  280. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  281. if err != nil {
  282. return err
  283. }
  284. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  285. return err
  286. }
  287. // Remove all sequences related to the folder
  288. k1, err := db.keyer.GenerateSequenceKey(k0, folder, 0)
  289. if err != nil {
  290. return err
  291. }
  292. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  293. return err
  294. }
  295. // Remove all items related to the given folder from the global bucket
  296. k2, err := db.keyer.GenerateGlobalVersionKey(k1, folder, nil)
  297. if err != nil {
  298. return err
  299. }
  300. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  301. return err
  302. }
  303. // Remove all needs related to the folder
  304. k3, err := db.keyer.GenerateNeedFileKey(k2, folder, nil)
  305. if err != nil {
  306. return err
  307. }
  308. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  309. return err
  310. }
  311. // Remove the blockmap of the folder
  312. k4, err := db.keyer.GenerateBlockMapKey(k3, folder, nil, nil)
  313. if err != nil {
  314. return err
  315. }
  316. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  317. return err
  318. }
  319. k5, err := db.keyer.GenerateBlockListMapKey(k4, folder, nil, nil)
  320. if err != nil {
  321. return err
  322. }
  323. if err := t.deleteKeyPrefix(k5.WithoutHashAndName()); err != nil {
  324. return err
  325. }
  326. return t.Commit()
  327. }
  328. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  329. db.gcMut.RLock()
  330. defer db.gcMut.RUnlock()
  331. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  332. if err != nil {
  333. return err
  334. }
  335. defer t.close()
  336. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  337. if err != nil {
  338. return err
  339. }
  340. dbi, err := t.NewPrefixIterator(key)
  341. if err != nil {
  342. return err
  343. }
  344. defer dbi.Release()
  345. var gk, keyBuf []byte
  346. for dbi.Next() {
  347. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  348. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  349. if err != nil {
  350. return err
  351. }
  352. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  353. if err != nil {
  354. return err
  355. }
  356. if err := t.Delete(dbi.Key()); err != nil {
  357. return err
  358. }
  359. if err := t.Checkpoint(); err != nil {
  360. return err
  361. }
  362. }
  363. dbi.Release()
  364. if err := dbi.Error(); err != nil {
  365. return err
  366. }
  367. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  368. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  369. if err != nil {
  370. return err
  371. }
  372. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  373. return err
  374. }
  375. key2, err := db.keyer.GenerateBlockListMapKey(key, folder, nil, nil)
  376. if err != nil {
  377. return err
  378. }
  379. if err := t.deleteKeyPrefix(key2.WithoutHashAndName()); err != nil {
  380. return err
  381. }
  382. }
  383. return t.Commit()
  384. }
  385. func (db *Lowlevel) checkGlobals(folder []byte) error {
  386. t, err := db.newReadWriteTransaction()
  387. if err != nil {
  388. return err
  389. }
  390. defer t.close()
  391. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  392. if err != nil {
  393. return err
  394. }
  395. dbi, err := t.NewPrefixIterator(key.WithoutName())
  396. if err != nil {
  397. return err
  398. }
  399. defer dbi.Release()
  400. var dk []byte
  401. ro := t.readOnlyTransaction
  402. for dbi.Next() {
  403. var vl VersionList
  404. if err := vl.Unmarshal(dbi.Value()); err != nil || vl.Empty() {
  405. if err := t.Delete(dbi.Key()); err != nil {
  406. return err
  407. }
  408. continue
  409. }
  410. // Check the global version list for consistency. An issue in previous
  411. // versions of goleveldb could result in reordered writes so that
  412. // there are global entries pointing to no longer existing files. Here
  413. // we find those and clear them out.
  414. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  415. newVL := &VersionList{}
  416. var changed, changedHere bool
  417. for _, fv := range vl.RawVersions {
  418. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.Devices, newVL, ro)
  419. if err != nil {
  420. return err
  421. }
  422. changed = changed || changedHere
  423. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.InvalidDevices, newVL, ro)
  424. if err != nil {
  425. return err
  426. }
  427. changed = changed || changedHere
  428. }
  429. if newVL.Empty() {
  430. if err := t.Delete(dbi.Key()); err != nil {
  431. return err
  432. }
  433. } else if changed {
  434. if err := t.Put(dbi.Key(), mustMarshal(newVL)); err != nil {
  435. return err
  436. }
  437. }
  438. }
  439. dbi.Release()
  440. if err := dbi.Error(); err != nil {
  441. return err
  442. }
  443. l.Debugf("db check completed for %q", folder)
  444. return t.Commit()
  445. }
  446. func checkGlobalsFilterDevices(dk, folder, name []byte, devices [][]byte, vl *VersionList, t readOnlyTransaction) (bool, error) {
  447. var changed bool
  448. var err error
  449. for _, device := range devices {
  450. dk, err = t.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  451. if err != nil {
  452. return false, err
  453. }
  454. f, ok, err := t.getFileTrunc(dk, false)
  455. if err != nil {
  456. return false, err
  457. }
  458. if !ok {
  459. changed = true
  460. continue
  461. }
  462. _, _, _, _, _, _, err = vl.update(folder, device, f, t)
  463. if err != nil {
  464. return false, err
  465. }
  466. }
  467. return changed, nil
  468. }
  469. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  470. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  471. if err != nil {
  472. return 0, err
  473. }
  474. cur, err := db.Get(key)
  475. if backend.IsNotFound(err) {
  476. return 0, nil
  477. } else if err != nil {
  478. return 0, err
  479. }
  480. var id protocol.IndexID
  481. if err := id.Unmarshal(cur); err != nil {
  482. return 0, nil
  483. }
  484. return id, nil
  485. }
  486. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  487. bs, _ := id.Marshal() // marshalling can't fail
  488. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  489. if err != nil {
  490. return err
  491. }
  492. return db.Put(key, bs)
  493. }
  494. func (db *Lowlevel) dropMtimes(folder []byte) error {
  495. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  496. if err != nil {
  497. return err
  498. }
  499. return db.dropPrefix(key)
  500. }
  501. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  502. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  503. if err != nil {
  504. return err
  505. }
  506. return db.dropPrefix(key)
  507. }
  508. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  509. t, err := db.newReadWriteTransaction()
  510. if err != nil {
  511. return err
  512. }
  513. defer t.close()
  514. if err := t.deleteKeyPrefix(prefix); err != nil {
  515. return err
  516. }
  517. return t.Commit()
  518. }
  519. func (db *Lowlevel) gcRunner(ctx context.Context) {
  520. // Calculate the time for the next GC run. Even if we should run GC
  521. // directly, give the system a while to get up and running and do other
  522. // stuff first. (We might have migrations and stuff which would be
  523. // better off running before GC.)
  524. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  525. if next < time.Minute {
  526. next = time.Minute
  527. }
  528. t := time.NewTimer(next)
  529. defer t.Stop()
  530. for {
  531. select {
  532. case <-ctx.Done():
  533. return
  534. case <-t.C:
  535. if err := db.gcIndirect(ctx); err != nil {
  536. l.Warnln("Database indirection GC failed:", err)
  537. }
  538. db.recordTime(indirectGCTimeKey)
  539. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  540. }
  541. }
  542. }
  543. // recordTime records the current time under the given key, affecting the
  544. // next call to timeUntil with the same key.
  545. func (db *Lowlevel) recordTime(key string) {
  546. miscDB := NewMiscDataNamespace(db)
  547. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  548. }
  549. // timeUntil returns how long we should wait until the next interval, or
  550. // zero if it should happen directly.
  551. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  552. miscDB := NewMiscDataNamespace(db)
  553. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  554. nextTime := time.Unix(lastTime, 0).Add(every)
  555. sleepTime := time.Until(nextTime)
  556. if sleepTime < 0 {
  557. sleepTime = 0
  558. }
  559. return sleepTime
  560. }
  561. func (db *Lowlevel) gcIndirect(ctx context.Context) error {
  562. // The indirection GC uses bloom filters to track used block lists and
  563. // versions. This means iterating over all items, adding their hashes to
  564. // the filter, then iterating over the indirected items and removing
  565. // those that don't match the filter. The filter will give false
  566. // positives so we will keep around one percent of things that we don't
  567. // really need (at most).
  568. //
  569. // Indirection GC needs to run when there are no modifications to the
  570. // FileInfos or indirected items.
  571. db.gcMut.Lock()
  572. defer db.gcMut.Unlock()
  573. t, err := db.newReadWriteTransaction()
  574. if err != nil {
  575. return err
  576. }
  577. defer t.Release()
  578. // Set up the bloom filters with the initial capacity and false positive
  579. // rate, or higher capacity if we've done this before and seen lots of
  580. // items. For simplicity's sake we track just one count, which is the
  581. // highest of the various indirected items.
  582. capacity := indirectGCBloomCapacity
  583. if db.gcKeyCount > capacity {
  584. capacity = db.gcKeyCount
  585. }
  586. blockFilter := newBloomFilter(capacity)
  587. versionFilter := newBloomFilter(capacity)
  588. // Iterate the FileInfos, unmarshal the block and version hashes and
  589. // add them to the filter.
  590. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  591. if err != nil {
  592. return err
  593. }
  594. defer it.Release()
  595. for it.Next() {
  596. select {
  597. case <-ctx.Done():
  598. return ctx.Err()
  599. default:
  600. }
  601. var hashes IndirectionHashesOnly
  602. if err := hashes.Unmarshal(it.Value()); err != nil {
  603. return err
  604. }
  605. if len(hashes.BlocksHash) > 0 {
  606. blockFilter.add(hashes.BlocksHash)
  607. }
  608. if len(hashes.VersionHash) > 0 {
  609. versionFilter.add(hashes.VersionHash)
  610. }
  611. }
  612. it.Release()
  613. if err := it.Error(); err != nil {
  614. return err
  615. }
  616. // Iterate over block lists, removing keys with hashes that don't match
  617. // the filter.
  618. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  619. if err != nil {
  620. return err
  621. }
  622. defer it.Release()
  623. matchedBlocks := 0
  624. for it.Next() {
  625. select {
  626. case <-ctx.Done():
  627. return ctx.Err()
  628. default:
  629. }
  630. key := blockListKey(it.Key())
  631. if blockFilter.has(key.Hash()) {
  632. matchedBlocks++
  633. continue
  634. }
  635. if err := t.Delete(key); err != nil {
  636. return err
  637. }
  638. }
  639. it.Release()
  640. if err := it.Error(); err != nil {
  641. return err
  642. }
  643. // Iterate over version lists, removing keys with hashes that don't match
  644. // the filter.
  645. it, err = db.NewPrefixIterator([]byte{KeyTypeVersion})
  646. if err != nil {
  647. return err
  648. }
  649. matchedVersions := 0
  650. for it.Next() {
  651. select {
  652. case <-ctx.Done():
  653. return ctx.Err()
  654. default:
  655. }
  656. key := versionKey(it.Key())
  657. if versionFilter.has(key.Hash()) {
  658. matchedVersions++
  659. continue
  660. }
  661. if err := t.Delete(key); err != nil {
  662. return err
  663. }
  664. }
  665. it.Release()
  666. if err := it.Error(); err != nil {
  667. return err
  668. }
  669. // Remember the number of unique keys we kept until the next pass.
  670. db.gcKeyCount = matchedBlocks
  671. if matchedVersions > matchedBlocks {
  672. db.gcKeyCount = matchedVersions
  673. }
  674. if err := t.Commit(); err != nil {
  675. return err
  676. }
  677. return db.Compact()
  678. }
  679. func newBloomFilter(capacity int) bloomFilter {
  680. var buf [16]byte
  681. io.ReadFull(rand.Reader, buf[:])
  682. return bloomFilter{
  683. f: blobloom.NewOptimized(blobloom.Config{
  684. Capacity: uint64(capacity),
  685. FPRate: indirectGCBloomFalsePositiveRate,
  686. MaxBits: 8 * indirectGCBloomMaxBytes,
  687. }),
  688. k0: binary.LittleEndian.Uint64(buf[:8]),
  689. k1: binary.LittleEndian.Uint64(buf[8:]),
  690. }
  691. }
  692. type bloomFilter struct {
  693. f *blobloom.Filter
  694. k0, k1 uint64 // Random key for SipHash.
  695. }
  696. func (b *bloomFilter) add(id []byte) { b.f.Add(b.hash(id)) }
  697. func (b *bloomFilter) has(id []byte) bool { return b.f.Has(b.hash(id)) }
  698. // Hash function for the bloomfilter: SipHash of the SHA-256.
  699. //
  700. // The randomization in SipHash means we get different collisions across
  701. // runs and colliding keys are not kept indefinitely.
  702. func (b *bloomFilter) hash(id []byte) uint64 {
  703. if len(id) != sha256.Size {
  704. panic("bug: bloomFilter.hash passed something not a SHA256 hash")
  705. }
  706. return siphash.Hash(b.k0, b.k1, id)
  707. }
  708. // checkRepair checks folder metadata and sequences for miscellaneous errors.
  709. func (db *Lowlevel) checkRepair() {
  710. for _, folder := range db.ListFolders() {
  711. _ = db.getMetaAndCheck(folder)
  712. }
  713. }
  714. func (db *Lowlevel) getMetaAndCheck(folder string) *metadataTracker {
  715. db.gcMut.RLock()
  716. defer db.gcMut.RUnlock()
  717. var err error
  718. defer func() {
  719. if err != nil && !backend.IsClosed(err) {
  720. panic(err)
  721. }
  722. }()
  723. var fixed int
  724. fixed, err = db.checkLocalNeed([]byte(folder))
  725. if err != nil {
  726. return nil
  727. }
  728. if fixed != 0 {
  729. l.Infof("Repaired %d local need entries for folder %v in database", fixed, folder)
  730. }
  731. meta, err := db.recalcMeta(folder)
  732. if err != nil {
  733. return nil
  734. }
  735. fixed, err = db.repairSequenceGCLocked(folder, meta)
  736. if err != nil {
  737. return nil
  738. }
  739. if fixed != 0 {
  740. l.Infof("Repaired %d sequence entries for folder %v in database", fixed, folder)
  741. }
  742. return meta
  743. }
  744. func (db *Lowlevel) loadMetadataTracker(folder string) *metadataTracker {
  745. meta := newMetadataTracker(db.keyer)
  746. if err := meta.fromDB(db, []byte(folder)); err != nil {
  747. if err == errMetaInconsistent {
  748. l.Infof("Stored folder metadata for %q is inconsistent; recalculating", folder)
  749. } else {
  750. l.Infof("No stored folder metadata for %q; recalculating", folder)
  751. }
  752. return db.getMetaAndCheck(folder)
  753. }
  754. curSeq := meta.Sequence(protocol.LocalDeviceID)
  755. if metaOK := db.verifyLocalSequence(curSeq, folder); !metaOK {
  756. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  757. return db.getMetaAndCheck(folder)
  758. }
  759. if age := time.Since(meta.Created()); age > db.recheckInterval {
  760. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, util.NiceDurationString(age))
  761. return db.getMetaAndCheck(folder)
  762. }
  763. return meta
  764. }
  765. func (db *Lowlevel) recalcMeta(folderStr string) (*metadataTracker, error) {
  766. folder := []byte(folderStr)
  767. meta := newMetadataTracker(db.keyer)
  768. if err := db.checkGlobals(folder); err != nil {
  769. return nil, err
  770. }
  771. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  772. if err != nil {
  773. return nil, err
  774. }
  775. defer t.close()
  776. var deviceID protocol.DeviceID
  777. err = t.withAllFolderTruncated(folder, func(device []byte, f FileInfoTruncated) bool {
  778. copy(deviceID[:], device)
  779. meta.addFile(deviceID, f)
  780. return true
  781. })
  782. if err != nil {
  783. return nil, err
  784. }
  785. err = t.withGlobal(folder, nil, true, func(f protocol.FileIntf) bool {
  786. meta.addFile(protocol.GlobalDeviceID, f)
  787. return true
  788. })
  789. meta.emptyNeeded(protocol.LocalDeviceID)
  790. err = t.withNeed(folder, protocol.LocalDeviceID[:], true, func(f protocol.FileIntf) bool {
  791. meta.addNeeded(protocol.LocalDeviceID, f)
  792. return true
  793. })
  794. if err != nil {
  795. return nil, err
  796. }
  797. for _, device := range meta.devices() {
  798. meta.emptyNeeded(device)
  799. err = t.withNeed(folder, device[:], true, func(f protocol.FileIntf) bool {
  800. meta.addNeeded(device, f)
  801. return true
  802. })
  803. if err != nil {
  804. return nil, err
  805. }
  806. }
  807. meta.SetCreated()
  808. if err := t.Commit(); err != nil {
  809. return nil, err
  810. }
  811. return meta, nil
  812. }
  813. // Verify the local sequence number from actual sequence entries. Returns
  814. // true if it was all good, or false if a fixup was necessary.
  815. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) bool {
  816. // Walk the sequence index from the current (supposedly) highest
  817. // sequence number and raise the alarm if we get anything. This recovers
  818. // from the occasion where we have written sequence entries to disk but
  819. // not yet written new metadata to disk.
  820. //
  821. // Note that we can have the same thing happen for remote devices but
  822. // there it's not a problem -- we'll simply advertise a lower sequence
  823. // number than we've actually seen and receive some duplicate updates
  824. // and then be in sync again.
  825. t, err := db.newReadOnlyTransaction()
  826. if err != nil {
  827. panic(err)
  828. }
  829. ok := true
  830. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi protocol.FileIntf) bool {
  831. ok = false // we got something, which we should not have
  832. return false
  833. }); err != nil && !backend.IsClosed(err) {
  834. panic(err)
  835. }
  836. t.close()
  837. return ok
  838. }
  839. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  840. // match those in the corresponding file entries. It returns the amount of fixed
  841. // entries.
  842. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  843. t, err := db.newReadWriteTransaction(meta.CommitHook([]byte(folderStr)))
  844. if err != nil {
  845. return 0, err
  846. }
  847. defer t.close()
  848. fixed := 0
  849. folder := []byte(folderStr)
  850. // First check that every file entry has a matching sequence entry
  851. // (this was previously db schema upgrade to 9).
  852. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  853. if err != nil {
  854. return 0, err
  855. }
  856. it, err := t.NewPrefixIterator(dk.WithoutName())
  857. if err != nil {
  858. return 0, err
  859. }
  860. defer it.Release()
  861. var sk sequenceKey
  862. for it.Next() {
  863. intf, err := t.unmarshalTrunc(it.Value(), false)
  864. if err != nil {
  865. return 0, err
  866. }
  867. fi := intf.(protocol.FileInfo)
  868. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  869. return 0, err
  870. }
  871. switch dk, err = t.Get(sk); {
  872. case err != nil:
  873. if !backend.IsNotFound(err) {
  874. return 0, err
  875. }
  876. fallthrough
  877. case !bytes.Equal(it.Key(), dk):
  878. fixed++
  879. fi.Sequence = meta.nextLocalSeq()
  880. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  881. return 0, err
  882. }
  883. if err := t.Put(sk, it.Key()); err != nil {
  884. return 0, err
  885. }
  886. if err := t.putFile(it.Key(), fi); err != nil {
  887. return 0, err
  888. }
  889. }
  890. if err := t.Checkpoint(); err != nil {
  891. return 0, err
  892. }
  893. }
  894. if err := it.Error(); err != nil {
  895. return 0, err
  896. }
  897. it.Release()
  898. // Secondly check there's no sequence entries pointing at incorrect things.
  899. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  900. if err != nil {
  901. return 0, err
  902. }
  903. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  904. if err != nil {
  905. return 0, err
  906. }
  907. defer it.Release()
  908. for it.Next() {
  909. // Check that the sequence from the key matches the
  910. // sequence in the file.
  911. fi, ok, err := t.getFileTrunc(it.Value(), true)
  912. if err != nil {
  913. return 0, err
  914. }
  915. if ok {
  916. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  917. continue
  918. }
  919. }
  920. // Either the file is missing or has a different sequence number
  921. fixed++
  922. if err := t.Delete(it.Key()); err != nil {
  923. return 0, err
  924. }
  925. }
  926. if err := it.Error(); err != nil {
  927. return 0, err
  928. }
  929. it.Release()
  930. return fixed, t.Commit()
  931. }
  932. // Does not take care of metadata - if anything is repaired, the need count
  933. // needs to be recalculated.
  934. func (db *Lowlevel) checkLocalNeed(folder []byte) (int, error) {
  935. repaired := 0
  936. t, err := db.newReadWriteTransaction()
  937. if err != nil {
  938. return 0, err
  939. }
  940. defer t.close()
  941. key, err := t.keyer.GenerateNeedFileKey(nil, folder, nil)
  942. if err != nil {
  943. return 0, err
  944. }
  945. dbi, err := t.NewPrefixIterator(key.WithoutName())
  946. if err != nil {
  947. return 0, err
  948. }
  949. defer dbi.Release()
  950. var needName string
  951. var needDone bool
  952. next := func() {
  953. needDone = !dbi.Next()
  954. if !needDone {
  955. needName = string(t.keyer.NameFromGlobalVersionKey(dbi.Key()))
  956. }
  957. }
  958. next()
  959. t.withNeedIteratingGlobal(folder, protocol.LocalDeviceID[:], true, func(fi protocol.FileIntf) bool {
  960. f := fi.(FileInfoTruncated)
  961. for !needDone && needName < f.Name {
  962. repaired++
  963. if err = t.Delete(dbi.Key()); err != nil {
  964. return false
  965. }
  966. l.Debugln("check local need: removing", needName)
  967. next()
  968. }
  969. if needName == f.Name {
  970. next()
  971. } else {
  972. repaired++
  973. key, err = t.keyer.GenerateNeedFileKey(key, folder, []byte(f.Name))
  974. if err != nil {
  975. return false
  976. }
  977. if err = t.Put(key, nil); err != nil {
  978. return false
  979. }
  980. l.Debugln("check local need: adding", f.Name)
  981. }
  982. return true
  983. })
  984. if err != nil {
  985. return 0, err
  986. }
  987. for !needDone {
  988. repaired++
  989. if err := t.Delete(dbi.Key()); err != nil {
  990. return 0, err
  991. }
  992. l.Debugln("check local need: removing", needName)
  993. next()
  994. }
  995. if err := dbi.Error(); err != nil {
  996. return 0, err
  997. }
  998. dbi.Release()
  999. if err = t.Commit(); err != nil {
  1000. return 0, err
  1001. }
  1002. return repaired, nil
  1003. }
  1004. func (db *Lowlevel) needsRepairPath() string {
  1005. path := db.Location()
  1006. if path == "" {
  1007. return ""
  1008. }
  1009. if path[len(path)-1] == fs.PathSeparator {
  1010. path = path[:len(path)-1]
  1011. }
  1012. return path + needsRepairSuffix
  1013. }
  1014. // unchanged checks if two files are the same and thus don't need to be updated.
  1015. // Local flags or the invalid bit might change without the version
  1016. // being bumped.
  1017. func unchanged(nf, ef protocol.FileIntf) bool {
  1018. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  1019. }