lowlevel.go 26 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "io"
  12. "time"
  13. "github.com/dchest/siphash"
  14. "github.com/greatroar/blobloom"
  15. "github.com/syncthing/syncthing/lib/db/backend"
  16. "github.com/syncthing/syncthing/lib/protocol"
  17. "github.com/syncthing/syncthing/lib/rand"
  18. "github.com/syncthing/syncthing/lib/sha256"
  19. "github.com/syncthing/syncthing/lib/sync"
  20. "github.com/syncthing/syncthing/lib/util"
  21. "github.com/thejerf/suture"
  22. )
  23. const (
  24. // We set the bloom filter capacity to handle 100k individual items with
  25. // a false positive probability of 1% for the first pass. Once we know
  26. // how many items we have we will use that number instead, if it's more
  27. // than 100k. For fewer than 100k items we will just get better false
  28. // positive rate instead.
  29. indirectGCBloomCapacity = 100000
  30. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  31. indirectGCBloomMaxBytes = 32 << 20 // Use at most 32MiB memory, which covers our desired FP rate at 27 M items
  32. indirectGCDefaultInterval = 13 * time.Hour
  33. indirectGCTimeKey = "lastIndirectGCTime"
  34. // Use indirection for the block list when it exceeds this many entries
  35. blocksIndirectionCutoff = 3
  36. // Use indirection for the version vector when it exceeds this many entries
  37. versionIndirectionCutoff = 10
  38. recheckDefaultInterval = 30 * 24 * time.Hour
  39. )
  40. // Lowlevel is the lowest level database interface. It has a very simple
  41. // purpose: hold the actual backend database, and the in-memory state
  42. // that belong to that database. In the same way that a single on disk
  43. // database can only be opened once, there should be only one Lowlevel for
  44. // any given backend.
  45. type Lowlevel struct {
  46. *suture.Supervisor
  47. backend.Backend
  48. folderIdx *smallIndex
  49. deviceIdx *smallIndex
  50. keyer keyer
  51. gcMut sync.RWMutex
  52. gcKeyCount int
  53. indirectGCInterval time.Duration
  54. recheckInterval time.Duration
  55. }
  56. func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
  57. db := &Lowlevel{
  58. Supervisor: suture.New("db.Lowlevel", suture.Spec{
  59. // Only log restarts in debug mode.
  60. Log: func(line string) {
  61. l.Debugln(line)
  62. },
  63. PassThroughPanics: true,
  64. }),
  65. Backend: backend,
  66. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  67. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  68. gcMut: sync.NewRWMutex(),
  69. indirectGCInterval: indirectGCDefaultInterval,
  70. recheckInterval: recheckDefaultInterval,
  71. }
  72. for _, opt := range opts {
  73. opt(db)
  74. }
  75. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  76. db.Add(util.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  77. return db
  78. }
  79. type Option func(*Lowlevel)
  80. // WithRecheckInterval sets the time interval in between metadata recalculations
  81. // and consistency checks.
  82. func WithRecheckInterval(dur time.Duration) Option {
  83. return func(db *Lowlevel) {
  84. if dur > 0 {
  85. db.recheckInterval = dur
  86. }
  87. }
  88. }
  89. // WithIndirectGCInterval sets the time interval in between GC runs.
  90. func WithIndirectGCInterval(dur time.Duration) Option {
  91. return func(db *Lowlevel) {
  92. if dur > 0 {
  93. db.indirectGCInterval = dur
  94. }
  95. }
  96. }
  97. // ListFolders returns the list of folders currently in the database
  98. func (db *Lowlevel) ListFolders() []string {
  99. return db.folderIdx.Values()
  100. }
  101. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  102. // global versionlist and metadata.
  103. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  104. db.gcMut.RLock()
  105. defer db.gcMut.RUnlock()
  106. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  107. if err != nil {
  108. return err
  109. }
  110. defer t.close()
  111. var dk, gk, keyBuf []byte
  112. devID, err := protocol.DeviceIDFromBytes(device)
  113. if err != nil {
  114. return err
  115. }
  116. for _, f := range fs {
  117. name := []byte(f.Name)
  118. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  119. if err != nil {
  120. return err
  121. }
  122. ef, ok, err := t.getFileTrunc(dk, true)
  123. if err != nil {
  124. return err
  125. }
  126. if ok && unchanged(f, ef) {
  127. l.Debugf("not inserting unchanged (remote); folder=%q device=%v %v", folder, devID, f)
  128. continue
  129. }
  130. if ok {
  131. meta.removeFile(devID, ef)
  132. }
  133. meta.addFile(devID, f)
  134. l.Debugf("insert (remote); folder=%q device=%v %v", folder, devID, f)
  135. if err := t.putFile(dk, f); err != nil {
  136. return err
  137. }
  138. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  139. if err != nil {
  140. return err
  141. }
  142. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  143. if err != nil {
  144. return err
  145. }
  146. if err := t.Checkpoint(); err != nil {
  147. return err
  148. }
  149. }
  150. return t.Commit()
  151. }
  152. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  153. // metadata, sequence and blockmap buckets.
  154. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  155. db.gcMut.RLock()
  156. defer db.gcMut.RUnlock()
  157. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  158. if err != nil {
  159. return err
  160. }
  161. defer t.close()
  162. var dk, gk, keyBuf []byte
  163. blockBuf := make([]byte, 4)
  164. for _, f := range fs {
  165. name := []byte(f.Name)
  166. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  167. if err != nil {
  168. return err
  169. }
  170. ef, ok, err := t.getFileByKey(dk)
  171. if err != nil {
  172. return err
  173. }
  174. if ok && unchanged(f, ef) {
  175. l.Debugf("not inserting unchanged (local); folder=%q %v", folder, f)
  176. continue
  177. }
  178. blocksHashSame := ok && bytes.Equal(ef.BlocksHash, f.BlocksHash)
  179. if ok {
  180. if len(ef.Blocks) != 0 && !ef.IsInvalid() && ef.Size > 0 {
  181. for _, block := range ef.Blocks {
  182. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  183. if err != nil {
  184. return err
  185. }
  186. if err := t.Delete(keyBuf); err != nil {
  187. return err
  188. }
  189. }
  190. if !blocksHashSame {
  191. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, ef.BlocksHash, name)
  192. if err != nil {
  193. return err
  194. }
  195. if err = t.Delete(keyBuf); err != nil {
  196. return err
  197. }
  198. }
  199. }
  200. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  201. if err != nil {
  202. return err
  203. }
  204. if err := t.Delete(keyBuf); err != nil {
  205. return err
  206. }
  207. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  208. }
  209. f.Sequence = meta.nextLocalSeq()
  210. if ok {
  211. meta.removeFile(protocol.LocalDeviceID, ef)
  212. }
  213. meta.addFile(protocol.LocalDeviceID, f)
  214. l.Debugf("insert (local); folder=%q %v", folder, f)
  215. if err := t.putFile(dk, f); err != nil {
  216. return err
  217. }
  218. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  219. if err != nil {
  220. return err
  221. }
  222. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  223. if err != nil {
  224. return err
  225. }
  226. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  227. if err != nil {
  228. return err
  229. }
  230. if err := t.Put(keyBuf, dk); err != nil {
  231. return err
  232. }
  233. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  234. if len(f.Blocks) != 0 && !f.IsInvalid() && f.Size > 0 {
  235. for i, block := range f.Blocks {
  236. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  237. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  238. if err != nil {
  239. return err
  240. }
  241. if err := t.Put(keyBuf, blockBuf); err != nil {
  242. return err
  243. }
  244. }
  245. if !blocksHashSame {
  246. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, f.BlocksHash, name)
  247. if err != nil {
  248. return err
  249. }
  250. if err = t.Put(keyBuf, nil); err != nil {
  251. return err
  252. }
  253. }
  254. }
  255. if err := t.Checkpoint(); err != nil {
  256. return err
  257. }
  258. }
  259. return t.Commit()
  260. }
  261. func (db *Lowlevel) dropFolder(folder []byte) error {
  262. db.gcMut.RLock()
  263. defer db.gcMut.RUnlock()
  264. t, err := db.newReadWriteTransaction()
  265. if err != nil {
  266. return err
  267. }
  268. defer t.close()
  269. // Remove all items related to the given folder from the device->file bucket
  270. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  271. if err != nil {
  272. return err
  273. }
  274. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  275. return err
  276. }
  277. // Remove all sequences related to the folder
  278. k1, err := db.keyer.GenerateSequenceKey(k0, folder, 0)
  279. if err != nil {
  280. return err
  281. }
  282. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  283. return err
  284. }
  285. // Remove all items related to the given folder from the global bucket
  286. k2, err := db.keyer.GenerateGlobalVersionKey(k1, folder, nil)
  287. if err != nil {
  288. return err
  289. }
  290. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  291. return err
  292. }
  293. // Remove all needs related to the folder
  294. k3, err := db.keyer.GenerateNeedFileKey(k2, folder, nil)
  295. if err != nil {
  296. return err
  297. }
  298. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  299. return err
  300. }
  301. // Remove the blockmap of the folder
  302. k4, err := db.keyer.GenerateBlockMapKey(k3, folder, nil, nil)
  303. if err != nil {
  304. return err
  305. }
  306. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  307. return err
  308. }
  309. k5, err := db.keyer.GenerateBlockListMapKey(k4, folder, nil, nil)
  310. if err != nil {
  311. return err
  312. }
  313. if err := t.deleteKeyPrefix(k5.WithoutHashAndName()); err != nil {
  314. return err
  315. }
  316. return t.Commit()
  317. }
  318. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  319. db.gcMut.RLock()
  320. defer db.gcMut.RUnlock()
  321. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  322. if err != nil {
  323. return err
  324. }
  325. defer t.close()
  326. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  327. if err != nil {
  328. return err
  329. }
  330. dbi, err := t.NewPrefixIterator(key)
  331. if err != nil {
  332. return err
  333. }
  334. defer dbi.Release()
  335. var gk, keyBuf []byte
  336. for dbi.Next() {
  337. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  338. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  339. if err != nil {
  340. return err
  341. }
  342. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  343. if err != nil {
  344. return err
  345. }
  346. if err := t.Delete(dbi.Key()); err != nil {
  347. return err
  348. }
  349. if err := t.Checkpoint(); err != nil {
  350. return err
  351. }
  352. }
  353. dbi.Release()
  354. if err := dbi.Error(); err != nil {
  355. return err
  356. }
  357. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  358. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  359. if err != nil {
  360. return err
  361. }
  362. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  363. return err
  364. }
  365. key2, err := db.keyer.GenerateBlockListMapKey(key, folder, nil, nil)
  366. if err != nil {
  367. return err
  368. }
  369. if err := t.deleteKeyPrefix(key2.WithoutHashAndName()); err != nil {
  370. return err
  371. }
  372. }
  373. return t.Commit()
  374. }
  375. func (db *Lowlevel) checkGlobals(folder []byte) error {
  376. t, err := db.newReadWriteTransaction()
  377. if err != nil {
  378. return err
  379. }
  380. defer t.close()
  381. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  382. if err != nil {
  383. return err
  384. }
  385. dbi, err := t.NewPrefixIterator(key.WithoutName())
  386. if err != nil {
  387. return err
  388. }
  389. defer dbi.Release()
  390. var dk []byte
  391. ro := t.readOnlyTransaction
  392. for dbi.Next() {
  393. var vl VersionList
  394. if err := vl.Unmarshal(dbi.Value()); err != nil || vl.Empty() {
  395. if err := t.Delete(dbi.Key()); err != nil {
  396. return err
  397. }
  398. continue
  399. }
  400. // Check the global version list for consistency. An issue in previous
  401. // versions of goleveldb could result in reordered writes so that
  402. // there are global entries pointing to no longer existing files. Here
  403. // we find those and clear them out.
  404. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  405. newVL := &VersionList{}
  406. var changed, changedHere bool
  407. for _, fv := range vl.RawVersions {
  408. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.Devices, newVL, ro)
  409. if err != nil {
  410. return err
  411. }
  412. changed = changed || changedHere
  413. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.InvalidDevices, newVL, ro)
  414. if err != nil {
  415. return err
  416. }
  417. changed = changed || changedHere
  418. }
  419. if newVL.Empty() {
  420. if err := t.Delete(dbi.Key()); err != nil {
  421. return err
  422. }
  423. } else if changed {
  424. if err := t.Put(dbi.Key(), mustMarshal(newVL)); err != nil {
  425. return err
  426. }
  427. }
  428. }
  429. dbi.Release()
  430. if err := dbi.Error(); err != nil {
  431. return err
  432. }
  433. l.Debugf("db check completed for %q", folder)
  434. return t.Commit()
  435. }
  436. func checkGlobalsFilterDevices(dk, folder, name []byte, devices [][]byte, vl *VersionList, t readOnlyTransaction) (bool, error) {
  437. var changed bool
  438. var err error
  439. for _, device := range devices {
  440. dk, err = t.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  441. if err != nil {
  442. return false, err
  443. }
  444. f, ok, err := t.getFileTrunc(dk, false)
  445. if err != nil {
  446. return false, err
  447. }
  448. if !ok {
  449. changed = true
  450. continue
  451. }
  452. _, _, _, _, _, _, err = vl.update(folder, device, f, t)
  453. if err != nil {
  454. return false, err
  455. }
  456. }
  457. return changed, nil
  458. }
  459. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  460. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  461. if err != nil {
  462. return 0, err
  463. }
  464. cur, err := db.Get(key)
  465. if backend.IsNotFound(err) {
  466. return 0, nil
  467. } else if err != nil {
  468. return 0, err
  469. }
  470. var id protocol.IndexID
  471. if err := id.Unmarshal(cur); err != nil {
  472. return 0, nil
  473. }
  474. return id, nil
  475. }
  476. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  477. bs, _ := id.Marshal() // marshalling can't fail
  478. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  479. if err != nil {
  480. return err
  481. }
  482. return db.Put(key, bs)
  483. }
  484. func (db *Lowlevel) dropMtimes(folder []byte) error {
  485. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  486. if err != nil {
  487. return err
  488. }
  489. return db.dropPrefix(key)
  490. }
  491. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  492. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  493. if err != nil {
  494. return err
  495. }
  496. return db.dropPrefix(key)
  497. }
  498. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  499. t, err := db.newReadWriteTransaction()
  500. if err != nil {
  501. return err
  502. }
  503. defer t.close()
  504. if err := t.deleteKeyPrefix(prefix); err != nil {
  505. return err
  506. }
  507. return t.Commit()
  508. }
  509. func (db *Lowlevel) gcRunner(ctx context.Context) {
  510. // Calculate the time for the next GC run. Even if we should run GC
  511. // directly, give the system a while to get up and running and do other
  512. // stuff first. (We might have migrations and stuff which would be
  513. // better off running before GC.)
  514. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  515. if next < time.Minute {
  516. next = time.Minute
  517. }
  518. t := time.NewTimer(next)
  519. defer t.Stop()
  520. for {
  521. select {
  522. case <-ctx.Done():
  523. return
  524. case <-t.C:
  525. if err := db.gcIndirect(ctx); err != nil {
  526. l.Warnln("Database indirection GC failed:", err)
  527. }
  528. db.recordTime(indirectGCTimeKey)
  529. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  530. }
  531. }
  532. }
  533. // recordTime records the current time under the given key, affecting the
  534. // next call to timeUntil with the same key.
  535. func (db *Lowlevel) recordTime(key string) {
  536. miscDB := NewMiscDataNamespace(db)
  537. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  538. }
  539. // timeUntil returns how long we should wait until the next interval, or
  540. // zero if it should happen directly.
  541. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  542. miscDB := NewMiscDataNamespace(db)
  543. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  544. nextTime := time.Unix(lastTime, 0).Add(every)
  545. sleepTime := time.Until(nextTime)
  546. if sleepTime < 0 {
  547. sleepTime = 0
  548. }
  549. return sleepTime
  550. }
  551. func (db *Lowlevel) gcIndirect(ctx context.Context) error {
  552. // The indirection GC uses bloom filters to track used block lists and
  553. // versions. This means iterating over all items, adding their hashes to
  554. // the filter, then iterating over the indirected items and removing
  555. // those that don't match the filter. The filter will give false
  556. // positives so we will keep around one percent of things that we don't
  557. // really need (at most).
  558. //
  559. // Indirection GC needs to run when there are no modifications to the
  560. // FileInfos or indirected items.
  561. db.gcMut.Lock()
  562. defer db.gcMut.Unlock()
  563. t, err := db.newReadWriteTransaction()
  564. if err != nil {
  565. return err
  566. }
  567. defer t.Release()
  568. // Set up the bloom filters with the initial capacity and false positive
  569. // rate, or higher capacity if we've done this before and seen lots of
  570. // items. For simplicity's sake we track just one count, which is the
  571. // highest of the various indirected items.
  572. capacity := indirectGCBloomCapacity
  573. if db.gcKeyCount > capacity {
  574. capacity = db.gcKeyCount
  575. }
  576. blockFilter := newBloomFilter(capacity)
  577. versionFilter := newBloomFilter(capacity)
  578. // Iterate the FileInfos, unmarshal the block and version hashes and
  579. // add them to the filter.
  580. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  581. if err != nil {
  582. return err
  583. }
  584. defer it.Release()
  585. for it.Next() {
  586. select {
  587. case <-ctx.Done():
  588. return ctx.Err()
  589. default:
  590. }
  591. var hashes IndirectionHashesOnly
  592. if err := hashes.Unmarshal(it.Value()); err != nil {
  593. return err
  594. }
  595. if len(hashes.BlocksHash) > 0 {
  596. blockFilter.add(hashes.BlocksHash)
  597. }
  598. if len(hashes.VersionHash) > 0 {
  599. versionFilter.add(hashes.VersionHash)
  600. }
  601. }
  602. it.Release()
  603. if err := it.Error(); err != nil {
  604. return err
  605. }
  606. // Iterate over block lists, removing keys with hashes that don't match
  607. // the filter.
  608. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  609. if err != nil {
  610. return err
  611. }
  612. defer it.Release()
  613. matchedBlocks := 0
  614. for it.Next() {
  615. select {
  616. case <-ctx.Done():
  617. return ctx.Err()
  618. default:
  619. }
  620. key := blockListKey(it.Key())
  621. if blockFilter.has(key.Hash()) {
  622. matchedBlocks++
  623. continue
  624. }
  625. if err := t.Delete(key); err != nil {
  626. return err
  627. }
  628. }
  629. it.Release()
  630. if err := it.Error(); err != nil {
  631. return err
  632. }
  633. // Iterate over version lists, removing keys with hashes that don't match
  634. // the filter.
  635. it, err = db.NewPrefixIterator([]byte{KeyTypeVersion})
  636. if err != nil {
  637. return err
  638. }
  639. matchedVersions := 0
  640. for it.Next() {
  641. select {
  642. case <-ctx.Done():
  643. return ctx.Err()
  644. default:
  645. }
  646. key := versionKey(it.Key())
  647. if versionFilter.has(key.Hash()) {
  648. matchedVersions++
  649. continue
  650. }
  651. if err := t.Delete(key); err != nil {
  652. return err
  653. }
  654. }
  655. it.Release()
  656. if err := it.Error(); err != nil {
  657. return err
  658. }
  659. // Remember the number of unique keys we kept until the next pass.
  660. db.gcKeyCount = matchedBlocks
  661. if matchedVersions > matchedBlocks {
  662. db.gcKeyCount = matchedVersions
  663. }
  664. if err := t.Commit(); err != nil {
  665. return err
  666. }
  667. return db.Compact()
  668. }
  669. func newBloomFilter(capacity int) bloomFilter {
  670. var buf [16]byte
  671. io.ReadFull(rand.Reader, buf[:])
  672. return bloomFilter{
  673. f: blobloom.NewOptimized(blobloom.Config{
  674. Capacity: uint64(capacity),
  675. FPRate: indirectGCBloomFalsePositiveRate,
  676. MaxBits: 8 * indirectGCBloomMaxBytes,
  677. }),
  678. k0: binary.LittleEndian.Uint64(buf[:8]),
  679. k1: binary.LittleEndian.Uint64(buf[8:]),
  680. }
  681. }
  682. type bloomFilter struct {
  683. f *blobloom.Filter
  684. k0, k1 uint64 // Random key for SipHash.
  685. }
  686. func (b *bloomFilter) add(id []byte) { b.f.Add(b.hash(id)) }
  687. func (b *bloomFilter) has(id []byte) bool { return b.f.Has(b.hash(id)) }
  688. // Hash function for the bloomfilter: SipHash of the SHA-256.
  689. //
  690. // The randomization in SipHash means we get different collisions across
  691. // runs and colliding keys are not kept indefinitely.
  692. func (b *bloomFilter) hash(id []byte) uint64 {
  693. if len(id) != sha256.Size {
  694. panic("bug: bloomFilter.hash passed something not a SHA256 hash")
  695. }
  696. return siphash.Hash(b.k0, b.k1, id)
  697. }
  698. // CheckRepair checks folder metadata and sequences for miscellaneous errors.
  699. func (db *Lowlevel) CheckRepair() {
  700. for _, folder := range db.ListFolders() {
  701. _ = db.getMetaAndCheck(folder)
  702. }
  703. }
  704. func (db *Lowlevel) getMetaAndCheck(folder string) *metadataTracker {
  705. db.gcMut.RLock()
  706. defer db.gcMut.RUnlock()
  707. meta, err := db.recalcMeta(folder)
  708. if err == nil {
  709. var fixed int
  710. fixed, err = db.repairSequenceGCLocked(folder, meta)
  711. if fixed != 0 {
  712. l.Infof("Repaired %d sequence entries in database", fixed)
  713. }
  714. }
  715. if backend.IsClosed(err) {
  716. return nil
  717. } else if err != nil {
  718. panic(err)
  719. }
  720. return meta
  721. }
  722. func (db *Lowlevel) loadMetadataTracker(folder string) *metadataTracker {
  723. meta := newMetadataTracker(db.keyer)
  724. if err := meta.fromDB(db, []byte(folder)); err != nil {
  725. if err == errMetaInconsistent {
  726. l.Infof("Stored folder metadata for %q is inconsistent; recalculating", folder)
  727. } else {
  728. l.Infof("No stored folder metadata for %q; recalculating", folder)
  729. }
  730. return db.getMetaAndCheck(folder)
  731. }
  732. curSeq := meta.Sequence(protocol.LocalDeviceID)
  733. if metaOK := db.verifyLocalSequence(curSeq, folder); !metaOK {
  734. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  735. return db.getMetaAndCheck(folder)
  736. }
  737. if age := time.Since(meta.Created()); age > db.recheckInterval {
  738. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, util.NiceDurationString(age))
  739. return db.getMetaAndCheck(folder)
  740. }
  741. return meta
  742. }
  743. func (db *Lowlevel) recalcMeta(folderStr string) (*metadataTracker, error) {
  744. folder := []byte(folderStr)
  745. meta := newMetadataTracker(db.keyer)
  746. if err := db.checkGlobals(folder); err != nil {
  747. return nil, err
  748. }
  749. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  750. if err != nil {
  751. return nil, err
  752. }
  753. defer t.close()
  754. var deviceID protocol.DeviceID
  755. err = t.withAllFolderTruncated(folder, func(device []byte, f FileInfoTruncated) bool {
  756. copy(deviceID[:], device)
  757. meta.addFile(deviceID, f)
  758. return true
  759. })
  760. if err != nil {
  761. return nil, err
  762. }
  763. err = t.withGlobal(folder, nil, true, func(f protocol.FileIntf) bool {
  764. meta.addFile(protocol.GlobalDeviceID, f)
  765. return true
  766. })
  767. meta.emptyNeeded(protocol.LocalDeviceID)
  768. err = t.withNeed(folder, protocol.LocalDeviceID[:], true, func(f protocol.FileIntf) bool {
  769. meta.addNeeded(protocol.LocalDeviceID, f)
  770. return true
  771. })
  772. if err != nil {
  773. return nil, err
  774. }
  775. for _, device := range meta.devices() {
  776. meta.emptyNeeded(device)
  777. err = t.withNeed(folder, device[:], true, func(f protocol.FileIntf) bool {
  778. meta.addNeeded(device, f)
  779. return true
  780. })
  781. if err != nil {
  782. return nil, err
  783. }
  784. }
  785. meta.SetCreated()
  786. if err := t.Commit(); err != nil {
  787. return nil, err
  788. }
  789. return meta, nil
  790. }
  791. // Verify the local sequence number from actual sequence entries. Returns
  792. // true if it was all good, or false if a fixup was necessary.
  793. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) bool {
  794. // Walk the sequence index from the current (supposedly) highest
  795. // sequence number and raise the alarm if we get anything. This recovers
  796. // from the occasion where we have written sequence entries to disk but
  797. // not yet written new metadata to disk.
  798. //
  799. // Note that we can have the same thing happen for remote devices but
  800. // there it's not a problem -- we'll simply advertise a lower sequence
  801. // number than we've actually seen and receive some duplicate updates
  802. // and then be in sync again.
  803. t, err := db.newReadOnlyTransaction()
  804. if err != nil {
  805. panic(err)
  806. }
  807. ok := true
  808. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi protocol.FileIntf) bool {
  809. ok = false // we got something, which we should not have
  810. return false
  811. }); err != nil && !backend.IsClosed(err) {
  812. panic(err)
  813. }
  814. t.close()
  815. return ok
  816. }
  817. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  818. // match those in the corresponding file entries. It returns the amount of fixed
  819. // entries.
  820. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  821. t, err := db.newReadWriteTransaction(meta.CommitHook([]byte(folderStr)))
  822. if err != nil {
  823. return 0, err
  824. }
  825. defer t.close()
  826. fixed := 0
  827. folder := []byte(folderStr)
  828. // First check that every file entry has a matching sequence entry
  829. // (this was previously db schema upgrade to 9).
  830. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  831. if err != nil {
  832. return 0, err
  833. }
  834. it, err := t.NewPrefixIterator(dk.WithoutName())
  835. if err != nil {
  836. return 0, err
  837. }
  838. defer it.Release()
  839. var sk sequenceKey
  840. for it.Next() {
  841. intf, err := t.unmarshalTrunc(it.Value(), false)
  842. if err != nil {
  843. return 0, err
  844. }
  845. fi := intf.(protocol.FileInfo)
  846. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  847. return 0, err
  848. }
  849. switch dk, err = t.Get(sk); {
  850. case err != nil:
  851. if !backend.IsNotFound(err) {
  852. return 0, err
  853. }
  854. fallthrough
  855. case !bytes.Equal(it.Key(), dk):
  856. fixed++
  857. fi.Sequence = meta.nextLocalSeq()
  858. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  859. return 0, err
  860. }
  861. if err := t.Put(sk, it.Key()); err != nil {
  862. return 0, err
  863. }
  864. if err := t.putFile(it.Key(), fi); err != nil {
  865. return 0, err
  866. }
  867. }
  868. if err := t.Checkpoint(); err != nil {
  869. return 0, err
  870. }
  871. }
  872. if err := it.Error(); err != nil {
  873. return 0, err
  874. }
  875. it.Release()
  876. // Secondly check there's no sequence entries pointing at incorrect things.
  877. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  878. if err != nil {
  879. return 0, err
  880. }
  881. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  882. if err != nil {
  883. return 0, err
  884. }
  885. defer it.Release()
  886. for it.Next() {
  887. // Check that the sequence from the key matches the
  888. // sequence in the file.
  889. fi, ok, err := t.getFileTrunc(it.Value(), true)
  890. if err != nil {
  891. return 0, err
  892. }
  893. if ok {
  894. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  895. continue
  896. }
  897. }
  898. // Either the file is missing or has a different sequence number
  899. fixed++
  900. if err := t.Delete(it.Key()); err != nil {
  901. return 0, err
  902. }
  903. }
  904. if err := it.Error(); err != nil {
  905. return 0, err
  906. }
  907. it.Release()
  908. return fixed, t.Commit()
  909. }
  910. // unchanged checks if two files are the same and thus don't need to be updated.
  911. // Local flags or the invalid bit might change without the version
  912. // being bumped.
  913. func unchanged(nf, ef protocol.FileIntf) bool {
  914. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  915. }