lowlevel.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "errors"
  12. "fmt"
  13. "io"
  14. "os"
  15. "regexp"
  16. "time"
  17. "github.com/dchest/siphash"
  18. "github.com/greatroar/blobloom"
  19. "github.com/syncthing/syncthing/lib/db/backend"
  20. "github.com/syncthing/syncthing/lib/events"
  21. "github.com/syncthing/syncthing/lib/fs"
  22. "github.com/syncthing/syncthing/lib/protocol"
  23. "github.com/syncthing/syncthing/lib/rand"
  24. "github.com/syncthing/syncthing/lib/sha256"
  25. "github.com/syncthing/syncthing/lib/svcutil"
  26. "github.com/syncthing/syncthing/lib/sync"
  27. "github.com/syncthing/syncthing/lib/util"
  28. "github.com/thejerf/suture/v4"
  29. )
  30. const (
  31. // We set the bloom filter capacity to handle 100k individual items with
  32. // a false positive probability of 1% for the first pass. Once we know
  33. // how many items we have we will use that number instead, if it's more
  34. // than 100k. For fewer than 100k items we will just get better false
  35. // positive rate instead.
  36. indirectGCBloomCapacity = 100000
  37. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  38. indirectGCBloomMaxBytes = 32 << 20 // Use at most 32MiB memory, which covers our desired FP rate at 27 M items
  39. indirectGCDefaultInterval = 13 * time.Hour
  40. indirectGCTimeKey = "lastIndirectGCTime"
  41. // Use indirection for the block list when it exceeds this many entries
  42. blocksIndirectionCutoff = 3
  43. // Use indirection for the version vector when it exceeds this many entries
  44. versionIndirectionCutoff = 10
  45. recheckDefaultInterval = 30 * 24 * time.Hour
  46. needsRepairSuffix = ".needsrepair"
  47. )
  48. // Lowlevel is the lowest level database interface. It has a very simple
  49. // purpose: hold the actual backend database, and the in-memory state
  50. // that belong to that database. In the same way that a single on disk
  51. // database can only be opened once, there should be only one Lowlevel for
  52. // any given backend.
  53. type Lowlevel struct {
  54. *suture.Supervisor
  55. backend.Backend
  56. folderIdx *smallIndex
  57. deviceIdx *smallIndex
  58. keyer keyer
  59. gcMut sync.RWMutex
  60. gcKeyCount int
  61. indirectGCInterval time.Duration
  62. recheckInterval time.Duration
  63. oneFileSetCreated chan struct{}
  64. evLogger events.Logger
  65. }
  66. func NewLowlevel(backend backend.Backend, evLogger events.Logger, opts ...Option) (*Lowlevel, error) {
  67. // Only log restarts in debug mode.
  68. spec := svcutil.SpecWithDebugLogger(l)
  69. db := &Lowlevel{
  70. Supervisor: suture.New("db.Lowlevel", spec),
  71. Backend: backend,
  72. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  73. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  74. gcMut: sync.NewRWMutex(),
  75. indirectGCInterval: indirectGCDefaultInterval,
  76. recheckInterval: recheckDefaultInterval,
  77. oneFileSetCreated: make(chan struct{}),
  78. evLogger: evLogger,
  79. }
  80. for _, opt := range opts {
  81. opt(db)
  82. }
  83. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  84. db.Add(svcutil.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  85. if path := db.needsRepairPath(); path != "" {
  86. if _, err := os.Lstat(path); err == nil {
  87. l.Infoln("Database was marked for repair - this may take a while")
  88. if err := db.checkRepair(); err != nil {
  89. db.handleFailure(err)
  90. return nil, err
  91. }
  92. os.Remove(path)
  93. }
  94. }
  95. return db, nil
  96. }
  97. type Option func(*Lowlevel)
  98. // WithRecheckInterval sets the time interval in between metadata recalculations
  99. // and consistency checks.
  100. func WithRecheckInterval(dur time.Duration) Option {
  101. return func(db *Lowlevel) {
  102. if dur > 0 {
  103. db.recheckInterval = dur
  104. }
  105. }
  106. }
  107. // WithIndirectGCInterval sets the time interval in between GC runs.
  108. func WithIndirectGCInterval(dur time.Duration) Option {
  109. return func(db *Lowlevel) {
  110. if dur > 0 {
  111. db.indirectGCInterval = dur
  112. }
  113. }
  114. }
  115. // ListFolders returns the list of folders currently in the database
  116. func (db *Lowlevel) ListFolders() []string {
  117. return db.folderIdx.Values()
  118. }
  119. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  120. // global versionlist and metadata.
  121. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  122. db.gcMut.RLock()
  123. defer db.gcMut.RUnlock()
  124. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  125. if err != nil {
  126. return err
  127. }
  128. defer t.close()
  129. var dk, gk, keyBuf []byte
  130. devID, err := protocol.DeviceIDFromBytes(device)
  131. if err != nil {
  132. return err
  133. }
  134. for _, f := range fs {
  135. name := []byte(f.Name)
  136. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  137. if err != nil {
  138. return err
  139. }
  140. ef, ok, err := t.getFileTrunc(dk, true)
  141. if err != nil {
  142. return err
  143. }
  144. if ok && unchanged(f, ef) {
  145. l.Debugf("not inserting unchanged (remote); folder=%q device=%v %v", folder, devID, f)
  146. continue
  147. }
  148. if ok {
  149. meta.removeFile(devID, ef)
  150. }
  151. meta.addFile(devID, f)
  152. l.Debugf("insert (remote); folder=%q device=%v %v", folder, devID, f)
  153. if err := t.putFile(dk, f); err != nil {
  154. return err
  155. }
  156. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  157. if err != nil {
  158. return err
  159. }
  160. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  161. if err != nil {
  162. return err
  163. }
  164. if err := t.Checkpoint(); err != nil {
  165. return err
  166. }
  167. }
  168. return t.Commit()
  169. }
  170. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  171. // metadata, sequence and blockmap buckets.
  172. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  173. db.gcMut.RLock()
  174. defer db.gcMut.RUnlock()
  175. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  176. if err != nil {
  177. return err
  178. }
  179. defer t.close()
  180. var dk, gk, keyBuf []byte
  181. blockBuf := make([]byte, 4)
  182. for _, f := range fs {
  183. name := []byte(f.Name)
  184. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  185. if err != nil {
  186. return err
  187. }
  188. ef, ok, err := t.getFileByKey(dk)
  189. if err != nil {
  190. return err
  191. }
  192. if ok && unchanged(f, ef) {
  193. l.Debugf("not inserting unchanged (local); folder=%q %v", folder, f)
  194. continue
  195. }
  196. blocksHashSame := ok && bytes.Equal(ef.BlocksHash, f.BlocksHash)
  197. if ok {
  198. if len(ef.Blocks) != 0 && !ef.IsInvalid() && ef.Size > 0 {
  199. for _, block := range ef.Blocks {
  200. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  201. if err != nil {
  202. return err
  203. }
  204. if err := t.Delete(keyBuf); err != nil {
  205. return err
  206. }
  207. }
  208. if !blocksHashSame {
  209. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, ef.BlocksHash, name)
  210. if err != nil {
  211. return err
  212. }
  213. if err = t.Delete(keyBuf); err != nil {
  214. return err
  215. }
  216. }
  217. }
  218. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  219. if err != nil {
  220. return err
  221. }
  222. if err := t.Delete(keyBuf); err != nil {
  223. return err
  224. }
  225. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  226. }
  227. f.Sequence = meta.nextLocalSeq()
  228. if ok {
  229. meta.removeFile(protocol.LocalDeviceID, ef)
  230. }
  231. meta.addFile(protocol.LocalDeviceID, f)
  232. l.Debugf("insert (local); folder=%q %v", folder, f)
  233. if err := t.putFile(dk, f); err != nil {
  234. return err
  235. }
  236. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  237. if err != nil {
  238. return err
  239. }
  240. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  241. if err != nil {
  242. return err
  243. }
  244. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  245. if err != nil {
  246. return err
  247. }
  248. if err := t.Put(keyBuf, dk); err != nil {
  249. return err
  250. }
  251. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  252. if len(f.Blocks) != 0 && !f.IsInvalid() && f.Size > 0 {
  253. for i, block := range f.Blocks {
  254. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  255. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  256. if err != nil {
  257. return err
  258. }
  259. if err := t.Put(keyBuf, blockBuf); err != nil {
  260. return err
  261. }
  262. }
  263. if !blocksHashSame {
  264. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, f.BlocksHash, name)
  265. if err != nil {
  266. return err
  267. }
  268. if err = t.Put(keyBuf, nil); err != nil {
  269. return err
  270. }
  271. }
  272. }
  273. if err := t.Checkpoint(); err != nil {
  274. return err
  275. }
  276. }
  277. return t.Commit()
  278. }
  279. func (db *Lowlevel) dropFolder(folder []byte) error {
  280. db.gcMut.RLock()
  281. defer db.gcMut.RUnlock()
  282. t, err := db.newReadWriteTransaction()
  283. if err != nil {
  284. return err
  285. }
  286. defer t.close()
  287. // Remove all items related to the given folder from the device->file bucket
  288. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  289. if err != nil {
  290. return err
  291. }
  292. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  293. return err
  294. }
  295. // Remove all sequences related to the folder
  296. k1, err := db.keyer.GenerateSequenceKey(k0, folder, 0)
  297. if err != nil {
  298. return err
  299. }
  300. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  301. return err
  302. }
  303. // Remove all items related to the given folder from the global bucket
  304. k2, err := db.keyer.GenerateGlobalVersionKey(k1, folder, nil)
  305. if err != nil {
  306. return err
  307. }
  308. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  309. return err
  310. }
  311. // Remove all needs related to the folder
  312. k3, err := db.keyer.GenerateNeedFileKey(k2, folder, nil)
  313. if err != nil {
  314. return err
  315. }
  316. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  317. return err
  318. }
  319. // Remove the blockmap of the folder
  320. k4, err := db.keyer.GenerateBlockMapKey(k3, folder, nil, nil)
  321. if err != nil {
  322. return err
  323. }
  324. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  325. return err
  326. }
  327. k5, err := db.keyer.GenerateBlockListMapKey(k4, folder, nil, nil)
  328. if err != nil {
  329. return err
  330. }
  331. if err := t.deleteKeyPrefix(k5.WithoutHashAndName()); err != nil {
  332. return err
  333. }
  334. return t.Commit()
  335. }
  336. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  337. db.gcMut.RLock()
  338. defer db.gcMut.RUnlock()
  339. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  340. if err != nil {
  341. return err
  342. }
  343. defer t.close()
  344. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  345. if err != nil {
  346. return err
  347. }
  348. dbi, err := t.NewPrefixIterator(key)
  349. if err != nil {
  350. return err
  351. }
  352. defer dbi.Release()
  353. var gk, keyBuf []byte
  354. for dbi.Next() {
  355. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  356. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  357. if err != nil {
  358. return err
  359. }
  360. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  361. if err != nil {
  362. return err
  363. }
  364. if err := t.Delete(dbi.Key()); err != nil {
  365. return err
  366. }
  367. if err := t.Checkpoint(); err != nil {
  368. return err
  369. }
  370. }
  371. dbi.Release()
  372. if err := dbi.Error(); err != nil {
  373. return err
  374. }
  375. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  376. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  377. if err != nil {
  378. return err
  379. }
  380. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  381. return err
  382. }
  383. key2, err := db.keyer.GenerateBlockListMapKey(key, folder, nil, nil)
  384. if err != nil {
  385. return err
  386. }
  387. if err := t.deleteKeyPrefix(key2.WithoutHashAndName()); err != nil {
  388. return err
  389. }
  390. }
  391. return t.Commit()
  392. }
  393. func (db *Lowlevel) checkGlobals(folder []byte) (int, error) {
  394. t, err := db.newReadWriteTransaction()
  395. if err != nil {
  396. return 0, err
  397. }
  398. defer t.close()
  399. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  400. if err != nil {
  401. return 0, err
  402. }
  403. dbi, err := t.NewPrefixIterator(key.WithoutName())
  404. if err != nil {
  405. return 0, err
  406. }
  407. defer dbi.Release()
  408. fixed := 0
  409. var dk []byte
  410. ro := t.readOnlyTransaction
  411. for dbi.Next() {
  412. var vl VersionList
  413. if err := vl.Unmarshal(dbi.Value()); err != nil || vl.Empty() {
  414. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  415. return 0, err
  416. }
  417. continue
  418. }
  419. // Check the global version list for consistency. An issue in previous
  420. // versions of goleveldb could result in reordered writes so that
  421. // there are global entries pointing to no longer existing files. Here
  422. // we find those and clear them out.
  423. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  424. newVL := &VersionList{}
  425. var changed, changedHere bool
  426. for _, fv := range vl.RawVersions {
  427. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.Devices, newVL, ro)
  428. if err != nil {
  429. return 0, err
  430. }
  431. changed = changed || changedHere
  432. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.InvalidDevices, newVL, ro)
  433. if err != nil {
  434. return 0, err
  435. }
  436. changed = changed || changedHere
  437. }
  438. if newVL.Empty() {
  439. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  440. return 0, err
  441. }
  442. fixed++
  443. } else if changed {
  444. if err := t.Put(dbi.Key(), mustMarshal(newVL)); err != nil {
  445. return 0, err
  446. }
  447. fixed++
  448. }
  449. }
  450. dbi.Release()
  451. if err := dbi.Error(); err != nil {
  452. return 0, err
  453. }
  454. l.Debugf("global db check completed for %q", folder)
  455. return fixed, t.Commit()
  456. }
  457. func checkGlobalsFilterDevices(dk, folder, name []byte, devices [][]byte, vl *VersionList, t readOnlyTransaction) (bool, error) {
  458. var changed bool
  459. var err error
  460. for _, device := range devices {
  461. dk, err = t.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  462. if err != nil {
  463. return false, err
  464. }
  465. f, ok, err := t.getFileTrunc(dk, false)
  466. if err != nil {
  467. return false, err
  468. }
  469. if !ok {
  470. changed = true
  471. continue
  472. }
  473. _, _, _, _, _, _, err = vl.update(folder, device, f, t)
  474. if err != nil {
  475. return false, err
  476. }
  477. }
  478. return changed, nil
  479. }
  480. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  481. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  482. if err != nil {
  483. return 0, err
  484. }
  485. cur, err := db.Get(key)
  486. if backend.IsNotFound(err) {
  487. return 0, nil
  488. } else if err != nil {
  489. return 0, err
  490. }
  491. var id protocol.IndexID
  492. if err := id.Unmarshal(cur); err != nil {
  493. return 0, nil
  494. }
  495. return id, nil
  496. }
  497. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  498. bs, _ := id.Marshal() // marshalling can't fail
  499. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  500. if err != nil {
  501. return err
  502. }
  503. return db.Put(key, bs)
  504. }
  505. func (db *Lowlevel) dropFolderIndexIDs(folder []byte) error {
  506. t, err := db.newReadWriteTransaction()
  507. if err != nil {
  508. return err
  509. }
  510. defer t.close()
  511. if err := t.deleteKeyPrefixMatching([]byte{KeyTypeIndexID}, func(key []byte) bool {
  512. keyFolder, ok := t.keyer.FolderFromIndexIDKey(key)
  513. if !ok {
  514. l.Debugf("Deleting IndexID with missing FolderIdx: %v", key)
  515. return true
  516. }
  517. return bytes.Equal(keyFolder, folder)
  518. }); err != nil {
  519. return err
  520. }
  521. return t.Commit()
  522. }
  523. func (db *Lowlevel) dropMtimes(folder []byte) error {
  524. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  525. if err != nil {
  526. return err
  527. }
  528. return db.dropPrefix(key)
  529. }
  530. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  531. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  532. if err != nil {
  533. return err
  534. }
  535. return db.dropPrefix(key)
  536. }
  537. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  538. t, err := db.newReadWriteTransaction()
  539. if err != nil {
  540. return err
  541. }
  542. defer t.close()
  543. if err := t.deleteKeyPrefix(prefix); err != nil {
  544. return err
  545. }
  546. return t.Commit()
  547. }
  548. func (db *Lowlevel) gcRunner(ctx context.Context) error {
  549. // Calculate the time for the next GC run. Even if we should run GC
  550. // directly, give the system a while to get up and running and do other
  551. // stuff first. (We might have migrations and stuff which would be
  552. // better off running before GC.)
  553. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  554. if next < time.Minute {
  555. next = time.Minute
  556. }
  557. t := time.NewTimer(next)
  558. defer t.Stop()
  559. for {
  560. select {
  561. case <-ctx.Done():
  562. return ctx.Err()
  563. case <-t.C:
  564. if err := db.gcIndirect(ctx); err != nil {
  565. l.Warnln("Database indirection GC failed:", err)
  566. }
  567. db.recordTime(indirectGCTimeKey)
  568. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  569. }
  570. }
  571. }
  572. // recordTime records the current time under the given key, affecting the
  573. // next call to timeUntil with the same key.
  574. func (db *Lowlevel) recordTime(key string) {
  575. miscDB := NewMiscDataNamespace(db)
  576. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  577. }
  578. // timeUntil returns how long we should wait until the next interval, or
  579. // zero if it should happen directly.
  580. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  581. miscDB := NewMiscDataNamespace(db)
  582. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  583. nextTime := time.Unix(lastTime, 0).Add(every)
  584. sleepTime := time.Until(nextTime)
  585. if sleepTime < 0 {
  586. sleepTime = 0
  587. }
  588. return sleepTime
  589. }
  590. func (db *Lowlevel) gcIndirect(ctx context.Context) error {
  591. // The indirection GC uses bloom filters to track used block lists and
  592. // versions. This means iterating over all items, adding their hashes to
  593. // the filter, then iterating over the indirected items and removing
  594. // those that don't match the filter. The filter will give false
  595. // positives so we will keep around one percent of things that we don't
  596. // really need (at most).
  597. //
  598. // Indirection GC needs to run when there are no modifications to the
  599. // FileInfos or indirected items.
  600. db.gcMut.Lock()
  601. defer db.gcMut.Unlock()
  602. t, err := db.newReadWriteTransaction()
  603. if err != nil {
  604. return err
  605. }
  606. defer t.Release()
  607. // Set up the bloom filters with the initial capacity and false positive
  608. // rate, or higher capacity if we've done this before and seen lots of
  609. // items. For simplicity's sake we track just one count, which is the
  610. // highest of the various indirected items.
  611. capacity := indirectGCBloomCapacity
  612. if db.gcKeyCount > capacity {
  613. capacity = db.gcKeyCount
  614. }
  615. blockFilter := newBloomFilter(capacity)
  616. versionFilter := newBloomFilter(capacity)
  617. // Iterate the FileInfos, unmarshal the block and version hashes and
  618. // add them to the filter.
  619. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  620. if err != nil {
  621. return err
  622. }
  623. defer it.Release()
  624. for it.Next() {
  625. select {
  626. case <-ctx.Done():
  627. return ctx.Err()
  628. default:
  629. }
  630. var hashes IndirectionHashesOnly
  631. if err := hashes.Unmarshal(it.Value()); err != nil {
  632. return err
  633. }
  634. if len(hashes.BlocksHash) > 0 {
  635. blockFilter.add(hashes.BlocksHash)
  636. }
  637. if len(hashes.VersionHash) > 0 {
  638. versionFilter.add(hashes.VersionHash)
  639. }
  640. }
  641. it.Release()
  642. if err := it.Error(); err != nil {
  643. return err
  644. }
  645. // Iterate over block lists, removing keys with hashes that don't match
  646. // the filter.
  647. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  648. if err != nil {
  649. return err
  650. }
  651. defer it.Release()
  652. matchedBlocks := 0
  653. for it.Next() {
  654. select {
  655. case <-ctx.Done():
  656. return ctx.Err()
  657. default:
  658. }
  659. key := blockListKey(it.Key())
  660. if blockFilter.has(key.Hash()) {
  661. matchedBlocks++
  662. continue
  663. }
  664. if err := t.Delete(key); err != nil {
  665. return err
  666. }
  667. }
  668. it.Release()
  669. if err := it.Error(); err != nil {
  670. return err
  671. }
  672. // Iterate over version lists, removing keys with hashes that don't match
  673. // the filter.
  674. it, err = db.NewPrefixIterator([]byte{KeyTypeVersion})
  675. if err != nil {
  676. return err
  677. }
  678. matchedVersions := 0
  679. for it.Next() {
  680. select {
  681. case <-ctx.Done():
  682. return ctx.Err()
  683. default:
  684. }
  685. key := versionKey(it.Key())
  686. if versionFilter.has(key.Hash()) {
  687. matchedVersions++
  688. continue
  689. }
  690. if err := t.Delete(key); err != nil {
  691. return err
  692. }
  693. }
  694. it.Release()
  695. if err := it.Error(); err != nil {
  696. return err
  697. }
  698. // Remember the number of unique keys we kept until the next pass.
  699. db.gcKeyCount = matchedBlocks
  700. if matchedVersions > matchedBlocks {
  701. db.gcKeyCount = matchedVersions
  702. }
  703. if err := t.Commit(); err != nil {
  704. return err
  705. }
  706. return db.Compact()
  707. }
  708. func newBloomFilter(capacity int) bloomFilter {
  709. var buf [16]byte
  710. io.ReadFull(rand.Reader, buf[:])
  711. return bloomFilter{
  712. f: blobloom.NewOptimized(blobloom.Config{
  713. Capacity: uint64(capacity),
  714. FPRate: indirectGCBloomFalsePositiveRate,
  715. MaxBits: 8 * indirectGCBloomMaxBytes,
  716. }),
  717. k0: binary.LittleEndian.Uint64(buf[:8]),
  718. k1: binary.LittleEndian.Uint64(buf[8:]),
  719. }
  720. }
  721. type bloomFilter struct {
  722. f *blobloom.Filter
  723. k0, k1 uint64 // Random key for SipHash.
  724. }
  725. func (b *bloomFilter) add(id []byte) { b.f.Add(b.hash(id)) }
  726. func (b *bloomFilter) has(id []byte) bool { return b.f.Has(b.hash(id)) }
  727. // Hash function for the bloomfilter: SipHash of the SHA-256.
  728. //
  729. // The randomization in SipHash means we get different collisions across
  730. // runs and colliding keys are not kept indefinitely.
  731. func (b *bloomFilter) hash(id []byte) uint64 {
  732. if len(id) != sha256.Size {
  733. panic("bug: bloomFilter.hash passed something not a SHA256 hash")
  734. }
  735. return siphash.Hash(b.k0, b.k1, id)
  736. }
  737. // checkRepair checks folder metadata and sequences for miscellaneous errors.
  738. func (db *Lowlevel) checkRepair() error {
  739. for _, folder := range db.ListFolders() {
  740. if _, err := db.getMetaAndCheck(folder); err != nil {
  741. return err
  742. }
  743. }
  744. return nil
  745. }
  746. func (db *Lowlevel) getMetaAndCheck(folder string) (*metadataTracker, error) {
  747. db.gcMut.RLock()
  748. defer db.gcMut.RUnlock()
  749. fixed, err := db.checkLocalNeed([]byte(folder))
  750. if err != nil {
  751. return nil, fmt.Errorf("checking local need: %w", err)
  752. }
  753. if fixed != 0 {
  754. l.Infof("Repaired %d local need entries for folder %v in database", fixed, folder)
  755. }
  756. meta, err := db.recalcMeta(folder)
  757. if err != nil {
  758. return nil, fmt.Errorf("recalculating metadata: %w", err)
  759. }
  760. fixed, err = db.repairSequenceGCLocked(folder, meta)
  761. if err != nil {
  762. return nil, fmt.Errorf("repairing sequences: %w", err)
  763. }
  764. if fixed != 0 {
  765. l.Infof("Repaired %d sequence entries for folder %v in database", fixed, folder)
  766. }
  767. return meta, nil
  768. }
  769. func (db *Lowlevel) loadMetadataTracker(folder string) (*metadataTracker, error) {
  770. meta := newMetadataTracker(db.keyer, db.evLogger)
  771. if err := meta.fromDB(db, []byte(folder)); err != nil {
  772. if err == errMetaInconsistent {
  773. l.Infof("Stored folder metadata for %q is inconsistent; recalculating", folder)
  774. } else {
  775. l.Infof("No stored folder metadata for %q; recalculating", folder)
  776. }
  777. return db.getMetaAndCheck(folder)
  778. }
  779. curSeq := meta.Sequence(protocol.LocalDeviceID)
  780. if metaOK, err := db.verifyLocalSequence(curSeq, folder); err != nil {
  781. return nil, fmt.Errorf("verifying sequences: %w", err)
  782. } else if !metaOK {
  783. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  784. return db.getMetaAndCheck(folder)
  785. }
  786. if age := time.Since(meta.Created()); age > db.recheckInterval {
  787. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, util.NiceDurationString(age))
  788. return db.getMetaAndCheck(folder)
  789. }
  790. return meta, nil
  791. }
  792. func (db *Lowlevel) recalcMeta(folderStr string) (*metadataTracker, error) {
  793. folder := []byte(folderStr)
  794. meta := newMetadataTracker(db.keyer, db.evLogger)
  795. if fixed, err := db.checkGlobals(folder); err != nil {
  796. return nil, fmt.Errorf("checking globals: %w", err)
  797. } else if fixed > 0 {
  798. l.Infof("Repaired %d global entries for folder %v in database", fixed, folderStr)
  799. }
  800. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  801. if err != nil {
  802. return nil, err
  803. }
  804. defer t.close()
  805. var deviceID protocol.DeviceID
  806. err = t.withAllFolderTruncated(folder, func(device []byte, f FileInfoTruncated) bool {
  807. copy(deviceID[:], device)
  808. meta.addFile(deviceID, f)
  809. return true
  810. })
  811. if err != nil {
  812. return nil, err
  813. }
  814. err = t.withGlobal(folder, nil, true, func(f protocol.FileIntf) bool {
  815. meta.addFile(protocol.GlobalDeviceID, f)
  816. return true
  817. })
  818. meta.emptyNeeded(protocol.LocalDeviceID)
  819. err = t.withNeed(folder, protocol.LocalDeviceID[:], true, func(f protocol.FileIntf) bool {
  820. meta.addNeeded(protocol.LocalDeviceID, f)
  821. return true
  822. })
  823. if err != nil {
  824. return nil, err
  825. }
  826. for _, device := range meta.devices() {
  827. meta.emptyNeeded(device)
  828. err = t.withNeed(folder, device[:], true, func(f protocol.FileIntf) bool {
  829. meta.addNeeded(device, f)
  830. return true
  831. })
  832. if err != nil {
  833. return nil, err
  834. }
  835. }
  836. meta.SetCreated()
  837. if err := t.Commit(); err != nil {
  838. return nil, err
  839. }
  840. return meta, nil
  841. }
  842. // Verify the local sequence number from actual sequence entries. Returns
  843. // true if it was all good, or false if a fixup was necessary.
  844. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) (bool, error) {
  845. // Walk the sequence index from the current (supposedly) highest
  846. // sequence number and raise the alarm if we get anything. This recovers
  847. // from the occasion where we have written sequence entries to disk but
  848. // not yet written new metadata to disk.
  849. //
  850. // Note that we can have the same thing happen for remote devices but
  851. // there it's not a problem -- we'll simply advertise a lower sequence
  852. // number than we've actually seen and receive some duplicate updates
  853. // and then be in sync again.
  854. t, err := db.newReadOnlyTransaction()
  855. if err != nil {
  856. return false, err
  857. }
  858. ok := true
  859. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi protocol.FileIntf) bool {
  860. ok = false // we got something, which we should not have
  861. return false
  862. }); err != nil {
  863. return false, err
  864. }
  865. t.close()
  866. return ok, nil
  867. }
  868. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  869. // match those in the corresponding file entries. It returns the amount of fixed
  870. // entries.
  871. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  872. t, err := db.newReadWriteTransaction(meta.CommitHook([]byte(folderStr)))
  873. if err != nil {
  874. return 0, err
  875. }
  876. defer t.close()
  877. fixed := 0
  878. folder := []byte(folderStr)
  879. // First check that every file entry has a matching sequence entry
  880. // (this was previously db schema upgrade to 9).
  881. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  882. if err != nil {
  883. return 0, err
  884. }
  885. it, err := t.NewPrefixIterator(dk.WithoutName())
  886. if err != nil {
  887. return 0, err
  888. }
  889. defer it.Release()
  890. var sk sequenceKey
  891. for it.Next() {
  892. intf, err := t.unmarshalTrunc(it.Value(), false)
  893. if err != nil {
  894. return 0, err
  895. }
  896. fi := intf.(protocol.FileInfo)
  897. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  898. return 0, err
  899. }
  900. switch dk, err = t.Get(sk); {
  901. case err != nil:
  902. if !backend.IsNotFound(err) {
  903. return 0, err
  904. }
  905. fallthrough
  906. case !bytes.Equal(it.Key(), dk):
  907. fixed++
  908. fi.Sequence = meta.nextLocalSeq()
  909. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  910. return 0, err
  911. }
  912. if err := t.Put(sk, it.Key()); err != nil {
  913. return 0, err
  914. }
  915. if err := t.putFile(it.Key(), fi); err != nil {
  916. return 0, err
  917. }
  918. }
  919. if err := t.Checkpoint(); err != nil {
  920. return 0, err
  921. }
  922. }
  923. if err := it.Error(); err != nil {
  924. return 0, err
  925. }
  926. it.Release()
  927. // Secondly check there's no sequence entries pointing at incorrect things.
  928. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  929. if err != nil {
  930. return 0, err
  931. }
  932. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  933. if err != nil {
  934. return 0, err
  935. }
  936. defer it.Release()
  937. for it.Next() {
  938. // Check that the sequence from the key matches the
  939. // sequence in the file.
  940. fi, ok, err := t.getFileTrunc(it.Value(), true)
  941. if err != nil {
  942. return 0, err
  943. }
  944. if ok {
  945. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  946. continue
  947. }
  948. }
  949. // Either the file is missing or has a different sequence number
  950. fixed++
  951. if err := t.Delete(it.Key()); err != nil {
  952. return 0, err
  953. }
  954. }
  955. if err := it.Error(); err != nil {
  956. return 0, err
  957. }
  958. it.Release()
  959. return fixed, t.Commit()
  960. }
  961. // Does not take care of metadata - if anything is repaired, the need count
  962. // needs to be recalculated.
  963. func (db *Lowlevel) checkLocalNeed(folder []byte) (int, error) {
  964. repaired := 0
  965. t, err := db.newReadWriteTransaction()
  966. if err != nil {
  967. return 0, err
  968. }
  969. defer t.close()
  970. key, err := t.keyer.GenerateNeedFileKey(nil, folder, nil)
  971. if err != nil {
  972. return 0, err
  973. }
  974. dbi, err := t.NewPrefixIterator(key.WithoutName())
  975. if err != nil {
  976. return 0, err
  977. }
  978. defer dbi.Release()
  979. var needName string
  980. var needDone bool
  981. next := func() {
  982. needDone = !dbi.Next()
  983. if !needDone {
  984. needName = string(t.keyer.NameFromGlobalVersionKey(dbi.Key()))
  985. }
  986. }
  987. next()
  988. t.withNeedIteratingGlobal(folder, protocol.LocalDeviceID[:], true, func(fi protocol.FileIntf) bool {
  989. f := fi.(FileInfoTruncated)
  990. for !needDone && needName < f.Name {
  991. repaired++
  992. if err = t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  993. return false
  994. }
  995. l.Debugln("check local need: removing", needName)
  996. next()
  997. }
  998. if needName == f.Name {
  999. next()
  1000. } else {
  1001. repaired++
  1002. key, err = t.keyer.GenerateNeedFileKey(key, folder, []byte(f.Name))
  1003. if err != nil {
  1004. return false
  1005. }
  1006. if err = t.Put(key, nil); err != nil {
  1007. return false
  1008. }
  1009. l.Debugln("check local need: adding", f.Name)
  1010. }
  1011. return true
  1012. })
  1013. if err != nil {
  1014. return 0, err
  1015. }
  1016. for !needDone {
  1017. repaired++
  1018. if err := t.Delete(dbi.Key()); err != nil && !backend.IsNotFound(err) {
  1019. return 0, err
  1020. }
  1021. l.Debugln("check local need: removing", needName)
  1022. next()
  1023. }
  1024. if err := dbi.Error(); err != nil {
  1025. return 0, err
  1026. }
  1027. dbi.Release()
  1028. if err = t.Commit(); err != nil {
  1029. return 0, err
  1030. }
  1031. return repaired, nil
  1032. }
  1033. func (db *Lowlevel) needsRepairPath() string {
  1034. path := db.Location()
  1035. if path == "" {
  1036. return ""
  1037. }
  1038. if path[len(path)-1] == fs.PathSeparator {
  1039. path = path[:len(path)-1]
  1040. }
  1041. return path + needsRepairSuffix
  1042. }
  1043. func (db *Lowlevel) checkErrorForRepair(err error) {
  1044. if errors.Is(err, errEntryFromGlobalMissing) || errors.Is(err, errEmptyGlobal) {
  1045. // Inconsistency error, mark db for repair on next start.
  1046. if path := db.needsRepairPath(); path != "" {
  1047. if fd, err := os.Create(path); err == nil {
  1048. fd.Close()
  1049. }
  1050. }
  1051. }
  1052. }
  1053. // unchanged checks if two files are the same and thus don't need to be updated.
  1054. // Local flags or the invalid bit might change without the version
  1055. // being bumped.
  1056. func unchanged(nf, ef protocol.FileIntf) bool {
  1057. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  1058. }
  1059. func (db *Lowlevel) handleFailure(err error) {
  1060. db.checkErrorForRepair(err)
  1061. if shouldReportFailure(err) {
  1062. db.evLogger.Log(events.Failure, err)
  1063. }
  1064. }
  1065. var ldbPathRe = regexp.MustCompile(`(open|write|read) .+[\\/].+[\\/]index[^\\/]+[\\/][^\\/]+: `)
  1066. func shouldReportFailure(err error) bool {
  1067. return !ldbPathRe.MatchString(err.Error())
  1068. }