lowlevel.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "io"
  12. "time"
  13. "github.com/dchest/siphash"
  14. "github.com/greatroar/blobloom"
  15. "github.com/syncthing/syncthing/lib/db/backend"
  16. "github.com/syncthing/syncthing/lib/protocol"
  17. "github.com/syncthing/syncthing/lib/rand"
  18. "github.com/syncthing/syncthing/lib/sha256"
  19. "github.com/syncthing/syncthing/lib/sync"
  20. "github.com/syncthing/syncthing/lib/util"
  21. "github.com/thejerf/suture"
  22. )
  23. const (
  24. // We set the bloom filter capacity to handle 100k individual items with
  25. // a false positive probability of 1% for the first pass. Once we know
  26. // how many items we have we will use that number instead, if it's more
  27. // than 100k. For fewer than 100k items we will just get better false
  28. // positive rate instead.
  29. indirectGCBloomCapacity = 100000
  30. indirectGCBloomFalsePositiveRate = 0.01 // 1%
  31. indirectGCBloomMaxBytes = 32 << 20 // Use at most 32MiB memory, which covers our desired FP rate at 27 M items
  32. indirectGCDefaultInterval = 13 * time.Hour
  33. indirectGCTimeKey = "lastIndirectGCTime"
  34. // Use indirection for the block list when it exceeds this many entries
  35. blocksIndirectionCutoff = 3
  36. // Use indirection for the version vector when it exceeds this many entries
  37. versionIndirectionCutoff = 10
  38. recheckDefaultInterval = 30 * 24 * time.Hour
  39. )
  40. // Lowlevel is the lowest level database interface. It has a very simple
  41. // purpose: hold the actual backend database, and the in-memory state
  42. // that belong to that database. In the same way that a single on disk
  43. // database can only be opened once, there should be only one Lowlevel for
  44. // any given backend.
  45. type Lowlevel struct {
  46. *suture.Supervisor
  47. backend.Backend
  48. folderIdx *smallIndex
  49. deviceIdx *smallIndex
  50. keyer keyer
  51. gcMut sync.RWMutex
  52. gcKeyCount int
  53. indirectGCInterval time.Duration
  54. recheckInterval time.Duration
  55. }
  56. func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
  57. db := &Lowlevel{
  58. Supervisor: suture.New("db.Lowlevel", suture.Spec{
  59. // Only log restarts in debug mode.
  60. Log: func(line string) {
  61. l.Debugln(line)
  62. },
  63. PassThroughPanics: true,
  64. }),
  65. Backend: backend,
  66. folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
  67. deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
  68. gcMut: sync.NewRWMutex(),
  69. indirectGCInterval: indirectGCDefaultInterval,
  70. recheckInterval: recheckDefaultInterval,
  71. }
  72. for _, opt := range opts {
  73. opt(db)
  74. }
  75. db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx)
  76. db.Add(util.AsService(db.gcRunner, "db.Lowlevel/gcRunner"))
  77. return db
  78. }
  79. type Option func(*Lowlevel)
  80. // WithRecheckInterval sets the time interval in between metadata recalculations
  81. // and consistency checks.
  82. func WithRecheckInterval(dur time.Duration) Option {
  83. return func(db *Lowlevel) {
  84. if dur > 0 {
  85. db.recheckInterval = dur
  86. }
  87. }
  88. }
  89. // WithIndirectGCInterval sets the time interval in between GC runs.
  90. func WithIndirectGCInterval(dur time.Duration) Option {
  91. return func(db *Lowlevel) {
  92. if dur > 0 {
  93. db.indirectGCInterval = dur
  94. }
  95. }
  96. }
  97. // ListFolders returns the list of folders currently in the database
  98. func (db *Lowlevel) ListFolders() []string {
  99. return db.folderIdx.Values()
  100. }
  101. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  102. // global versionlist and metadata.
  103. func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  104. db.gcMut.RLock()
  105. defer db.gcMut.RUnlock()
  106. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  107. if err != nil {
  108. return err
  109. }
  110. defer t.close()
  111. var dk, gk, keyBuf []byte
  112. devID, err := protocol.DeviceIDFromBytes(device)
  113. if err != nil {
  114. return err
  115. }
  116. for _, f := range fs {
  117. name := []byte(f.Name)
  118. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  119. if err != nil {
  120. return err
  121. }
  122. ef, ok, err := t.getFileTrunc(dk, true)
  123. if err != nil {
  124. return err
  125. }
  126. if ok && unchanged(f, ef) {
  127. continue
  128. }
  129. if ok {
  130. meta.removeFile(devID, ef)
  131. }
  132. meta.addFile(devID, f)
  133. l.Debugf("insert; folder=%q device=%v %v", folder, devID, f)
  134. if err := t.putFile(dk, f); err != nil {
  135. return err
  136. }
  137. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  138. if err != nil {
  139. return err
  140. }
  141. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  142. if err != nil {
  143. return err
  144. }
  145. if err := t.Checkpoint(); err != nil {
  146. return err
  147. }
  148. }
  149. return t.Commit()
  150. }
  151. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  152. // metadata, sequence and blockmap buckets.
  153. func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) error {
  154. db.gcMut.RLock()
  155. defer db.gcMut.RUnlock()
  156. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  157. if err != nil {
  158. return err
  159. }
  160. defer t.close()
  161. var dk, gk, keyBuf []byte
  162. blockBuf := make([]byte, 4)
  163. for _, f := range fs {
  164. name := []byte(f.Name)
  165. dk, err = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  166. if err != nil {
  167. return err
  168. }
  169. ef, ok, err := t.getFileByKey(dk)
  170. if err != nil {
  171. return err
  172. }
  173. if ok && unchanged(f, ef) {
  174. continue
  175. }
  176. blocksHashSame := ok && bytes.Equal(ef.BlocksHash, f.BlocksHash)
  177. if ok {
  178. if len(ef.Blocks) != 0 && !ef.IsInvalid() && ef.Size > 0 {
  179. for _, block := range ef.Blocks {
  180. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  181. if err != nil {
  182. return err
  183. }
  184. if err := t.Delete(keyBuf); err != nil {
  185. return err
  186. }
  187. }
  188. if !blocksHashSame {
  189. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, ef.BlocksHash, name)
  190. if err != nil {
  191. return err
  192. }
  193. if err = t.Delete(keyBuf); err != nil {
  194. return err
  195. }
  196. }
  197. }
  198. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  199. if err != nil {
  200. return err
  201. }
  202. if err := t.Delete(keyBuf); err != nil {
  203. return err
  204. }
  205. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  206. }
  207. f.Sequence = meta.nextLocalSeq()
  208. if ok {
  209. meta.removeFile(protocol.LocalDeviceID, ef)
  210. }
  211. meta.addFile(protocol.LocalDeviceID, f)
  212. l.Debugf("insert (local); folder=%q %v", folder, f)
  213. if err := t.putFile(dk, f); err != nil {
  214. return err
  215. }
  216. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  217. if err != nil {
  218. return err
  219. }
  220. keyBuf, _, err = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  221. if err != nil {
  222. return err
  223. }
  224. keyBuf, err = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  225. if err != nil {
  226. return err
  227. }
  228. if err := t.Put(keyBuf, dk); err != nil {
  229. return err
  230. }
  231. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  232. if len(f.Blocks) != 0 && !f.IsInvalid() && f.Size > 0 {
  233. for i, block := range f.Blocks {
  234. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  235. keyBuf, err = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  236. if err != nil {
  237. return err
  238. }
  239. if err := t.Put(keyBuf, blockBuf); err != nil {
  240. return err
  241. }
  242. }
  243. if !blocksHashSame {
  244. keyBuf, err := db.keyer.GenerateBlockListMapKey(keyBuf, folder, f.BlocksHash, name)
  245. if err != nil {
  246. return err
  247. }
  248. if err = t.Put(keyBuf, nil); err != nil {
  249. return err
  250. }
  251. }
  252. }
  253. if err := t.Checkpoint(); err != nil {
  254. return err
  255. }
  256. }
  257. return t.Commit()
  258. }
  259. func (db *Lowlevel) dropFolder(folder []byte) error {
  260. db.gcMut.RLock()
  261. defer db.gcMut.RUnlock()
  262. t, err := db.newReadWriteTransaction()
  263. if err != nil {
  264. return err
  265. }
  266. defer t.close()
  267. // Remove all items related to the given folder from the device->file bucket
  268. k0, err := db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil)
  269. if err != nil {
  270. return err
  271. }
  272. if err := t.deleteKeyPrefix(k0.WithoutNameAndDevice()); err != nil {
  273. return err
  274. }
  275. // Remove all sequences related to the folder
  276. k1, err := db.keyer.GenerateSequenceKey(k0, folder, 0)
  277. if err != nil {
  278. return err
  279. }
  280. if err := t.deleteKeyPrefix(k1.WithoutSequence()); err != nil {
  281. return err
  282. }
  283. // Remove all items related to the given folder from the global bucket
  284. k2, err := db.keyer.GenerateGlobalVersionKey(k1, folder, nil)
  285. if err != nil {
  286. return err
  287. }
  288. if err := t.deleteKeyPrefix(k2.WithoutName()); err != nil {
  289. return err
  290. }
  291. // Remove all needs related to the folder
  292. k3, err := db.keyer.GenerateNeedFileKey(k2, folder, nil)
  293. if err != nil {
  294. return err
  295. }
  296. if err := t.deleteKeyPrefix(k3.WithoutName()); err != nil {
  297. return err
  298. }
  299. // Remove the blockmap of the folder
  300. k4, err := db.keyer.GenerateBlockMapKey(k3, folder, nil, nil)
  301. if err != nil {
  302. return err
  303. }
  304. if err := t.deleteKeyPrefix(k4.WithoutHashAndName()); err != nil {
  305. return err
  306. }
  307. k5, err := db.keyer.GenerateBlockListMapKey(k4, folder, nil, nil)
  308. if err != nil {
  309. return err
  310. }
  311. if err := t.deleteKeyPrefix(k5.WithoutHashAndName()); err != nil {
  312. return err
  313. }
  314. return t.Commit()
  315. }
  316. func (db *Lowlevel) dropDeviceFolder(device, folder []byte, meta *metadataTracker) error {
  317. db.gcMut.RLock()
  318. defer db.gcMut.RUnlock()
  319. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  320. if err != nil {
  321. return err
  322. }
  323. defer t.close()
  324. key, err := db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)
  325. if err != nil {
  326. return err
  327. }
  328. dbi, err := t.NewPrefixIterator(key)
  329. if err != nil {
  330. return err
  331. }
  332. defer dbi.Release()
  333. var gk, keyBuf []byte
  334. for dbi.Next() {
  335. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  336. gk, err = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  337. if err != nil {
  338. return err
  339. }
  340. keyBuf, err = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  341. if err != nil {
  342. return err
  343. }
  344. if err := t.Delete(dbi.Key()); err != nil {
  345. return err
  346. }
  347. if err := t.Checkpoint(); err != nil {
  348. return err
  349. }
  350. }
  351. dbi.Release()
  352. if err := dbi.Error(); err != nil {
  353. return err
  354. }
  355. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  356. key, err := db.keyer.GenerateBlockMapKey(nil, folder, nil, nil)
  357. if err != nil {
  358. return err
  359. }
  360. if err := t.deleteKeyPrefix(key.WithoutHashAndName()); err != nil {
  361. return err
  362. }
  363. key2, err := db.keyer.GenerateBlockListMapKey(key, folder, nil, nil)
  364. if err != nil {
  365. return err
  366. }
  367. if err := t.deleteKeyPrefix(key2.WithoutHashAndName()); err != nil {
  368. return err
  369. }
  370. }
  371. return t.Commit()
  372. }
  373. func (db *Lowlevel) checkGlobals(folder []byte) error {
  374. t, err := db.newReadWriteTransaction()
  375. if err != nil {
  376. return err
  377. }
  378. defer t.close()
  379. key, err := db.keyer.GenerateGlobalVersionKey(nil, folder, nil)
  380. if err != nil {
  381. return err
  382. }
  383. dbi, err := t.NewPrefixIterator(key.WithoutName())
  384. if err != nil {
  385. return err
  386. }
  387. defer dbi.Release()
  388. var dk []byte
  389. ro := t.readOnlyTransaction
  390. for dbi.Next() {
  391. var vl VersionList
  392. if err := vl.Unmarshal(dbi.Value()); err != nil || vl.Empty() {
  393. if err := t.Delete(dbi.Key()); err != nil {
  394. return err
  395. }
  396. continue
  397. }
  398. // Check the global version list for consistency. An issue in previous
  399. // versions of goleveldb could result in reordered writes so that
  400. // there are global entries pointing to no longer existing files. Here
  401. // we find those and clear them out.
  402. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  403. newVL := &VersionList{}
  404. var changed, changedHere bool
  405. for _, fv := range vl.RawVersions {
  406. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.Devices, newVL, ro)
  407. if err != nil {
  408. return err
  409. }
  410. changed = changed || changedHere
  411. changedHere, err = checkGlobalsFilterDevices(dk, folder, name, fv.InvalidDevices, newVL, ro)
  412. if err != nil {
  413. return err
  414. }
  415. changed = changed || changedHere
  416. }
  417. if newVL.Empty() {
  418. if err := t.Delete(dbi.Key()); err != nil {
  419. return err
  420. }
  421. } else if changed {
  422. if err := t.Put(dbi.Key(), mustMarshal(newVL)); err != nil {
  423. return err
  424. }
  425. }
  426. }
  427. dbi.Release()
  428. if err := dbi.Error(); err != nil {
  429. return err
  430. }
  431. l.Debugf("db check completed for %q", folder)
  432. return t.Commit()
  433. }
  434. func checkGlobalsFilterDevices(dk, folder, name []byte, devices [][]byte, vl *VersionList, t readOnlyTransaction) (bool, error) {
  435. var changed bool
  436. var err error
  437. for _, device := range devices {
  438. dk, err = t.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  439. if err != nil {
  440. return false, err
  441. }
  442. f, ok, err := t.getFileTrunc(dk, false)
  443. if err != nil {
  444. return false, err
  445. }
  446. if !ok {
  447. changed = true
  448. continue
  449. }
  450. _, _, _, _, _, _, err = vl.update(folder, device, f, t)
  451. if err != nil {
  452. return false, err
  453. }
  454. }
  455. return changed, nil
  456. }
  457. func (db *Lowlevel) getIndexID(device, folder []byte) (protocol.IndexID, error) {
  458. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  459. if err != nil {
  460. return 0, err
  461. }
  462. cur, err := db.Get(key)
  463. if backend.IsNotFound(err) {
  464. return 0, nil
  465. } else if err != nil {
  466. return 0, err
  467. }
  468. var id protocol.IndexID
  469. if err := id.Unmarshal(cur); err != nil {
  470. return 0, nil
  471. }
  472. return id, nil
  473. }
  474. func (db *Lowlevel) setIndexID(device, folder []byte, id protocol.IndexID) error {
  475. bs, _ := id.Marshal() // marshalling can't fail
  476. key, err := db.keyer.GenerateIndexIDKey(nil, device, folder)
  477. if err != nil {
  478. return err
  479. }
  480. return db.Put(key, bs)
  481. }
  482. func (db *Lowlevel) dropMtimes(folder []byte) error {
  483. key, err := db.keyer.GenerateMtimesKey(nil, folder)
  484. if err != nil {
  485. return err
  486. }
  487. return db.dropPrefix(key)
  488. }
  489. func (db *Lowlevel) dropFolderMeta(folder []byte) error {
  490. key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
  491. if err != nil {
  492. return err
  493. }
  494. return db.dropPrefix(key)
  495. }
  496. func (db *Lowlevel) dropPrefix(prefix []byte) error {
  497. t, err := db.newReadWriteTransaction()
  498. if err != nil {
  499. return err
  500. }
  501. defer t.close()
  502. if err := t.deleteKeyPrefix(prefix); err != nil {
  503. return err
  504. }
  505. return t.Commit()
  506. }
  507. func (db *Lowlevel) gcRunner(ctx context.Context) {
  508. // Calculate the time for the next GC run. Even if we should run GC
  509. // directly, give the system a while to get up and running and do other
  510. // stuff first. (We might have migrations and stuff which would be
  511. // better off running before GC.)
  512. next := db.timeUntil(indirectGCTimeKey, db.indirectGCInterval)
  513. if next < time.Minute {
  514. next = time.Minute
  515. }
  516. t := time.NewTimer(next)
  517. defer t.Stop()
  518. for {
  519. select {
  520. case <-ctx.Done():
  521. return
  522. case <-t.C:
  523. if err := db.gcIndirect(ctx); err != nil {
  524. l.Warnln("Database indirection GC failed:", err)
  525. }
  526. db.recordTime(indirectGCTimeKey)
  527. t.Reset(db.timeUntil(indirectGCTimeKey, db.indirectGCInterval))
  528. }
  529. }
  530. }
  531. // recordTime records the current time under the given key, affecting the
  532. // next call to timeUntil with the same key.
  533. func (db *Lowlevel) recordTime(key string) {
  534. miscDB := NewMiscDataNamespace(db)
  535. _ = miscDB.PutInt64(key, time.Now().Unix()) // error wilfully ignored
  536. }
  537. // timeUntil returns how long we should wait until the next interval, or
  538. // zero if it should happen directly.
  539. func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
  540. miscDB := NewMiscDataNamespace(db)
  541. lastTime, _, _ := miscDB.Int64(key) // error wilfully ignored
  542. nextTime := time.Unix(lastTime, 0).Add(every)
  543. sleepTime := time.Until(nextTime)
  544. if sleepTime < 0 {
  545. sleepTime = 0
  546. }
  547. return sleepTime
  548. }
  549. func (db *Lowlevel) gcIndirect(ctx context.Context) error {
  550. // The indirection GC uses bloom filters to track used block lists and
  551. // versions. This means iterating over all items, adding their hashes to
  552. // the filter, then iterating over the indirected items and removing
  553. // those that don't match the filter. The filter will give false
  554. // positives so we will keep around one percent of things that we don't
  555. // really need (at most).
  556. //
  557. // Indirection GC needs to run when there are no modifications to the
  558. // FileInfos or indirected items.
  559. db.gcMut.Lock()
  560. defer db.gcMut.Unlock()
  561. t, err := db.newReadWriteTransaction()
  562. if err != nil {
  563. return err
  564. }
  565. defer t.Release()
  566. // Set up the bloom filters with the initial capacity and false positive
  567. // rate, or higher capacity if we've done this before and seen lots of
  568. // items. For simplicity's sake we track just one count, which is the
  569. // highest of the various indirected items.
  570. capacity := indirectGCBloomCapacity
  571. if db.gcKeyCount > capacity {
  572. capacity = db.gcKeyCount
  573. }
  574. blockFilter := newBloomFilter(capacity)
  575. versionFilter := newBloomFilter(capacity)
  576. // Iterate the FileInfos, unmarshal the block and version hashes and
  577. // add them to the filter.
  578. it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
  579. if err != nil {
  580. return err
  581. }
  582. defer it.Release()
  583. for it.Next() {
  584. select {
  585. case <-ctx.Done():
  586. return ctx.Err()
  587. default:
  588. }
  589. var hashes IndirectionHashesOnly
  590. if err := hashes.Unmarshal(it.Value()); err != nil {
  591. return err
  592. }
  593. if len(hashes.BlocksHash) > 0 {
  594. blockFilter.add(hashes.BlocksHash)
  595. }
  596. if len(hashes.VersionHash) > 0 {
  597. versionFilter.add(hashes.VersionHash)
  598. }
  599. }
  600. it.Release()
  601. if err := it.Error(); err != nil {
  602. return err
  603. }
  604. // Iterate over block lists, removing keys with hashes that don't match
  605. // the filter.
  606. it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
  607. if err != nil {
  608. return err
  609. }
  610. defer it.Release()
  611. matchedBlocks := 0
  612. for it.Next() {
  613. select {
  614. case <-ctx.Done():
  615. return ctx.Err()
  616. default:
  617. }
  618. key := blockListKey(it.Key())
  619. if blockFilter.has(key.Hash()) {
  620. matchedBlocks++
  621. continue
  622. }
  623. if err := t.Delete(key); err != nil {
  624. return err
  625. }
  626. }
  627. it.Release()
  628. if err := it.Error(); err != nil {
  629. return err
  630. }
  631. // Iterate over version lists, removing keys with hashes that don't match
  632. // the filter.
  633. it, err = db.NewPrefixIterator([]byte{KeyTypeVersion})
  634. if err != nil {
  635. return err
  636. }
  637. matchedVersions := 0
  638. for it.Next() {
  639. select {
  640. case <-ctx.Done():
  641. return ctx.Err()
  642. default:
  643. }
  644. key := versionKey(it.Key())
  645. if versionFilter.has(key.Hash()) {
  646. matchedVersions++
  647. continue
  648. }
  649. if err := t.Delete(key); err != nil {
  650. return err
  651. }
  652. }
  653. it.Release()
  654. if err := it.Error(); err != nil {
  655. return err
  656. }
  657. // Remember the number of unique keys we kept until the next pass.
  658. db.gcKeyCount = matchedBlocks
  659. if matchedVersions > matchedBlocks {
  660. db.gcKeyCount = matchedVersions
  661. }
  662. if err := t.Commit(); err != nil {
  663. return err
  664. }
  665. return db.Compact()
  666. }
  667. func newBloomFilter(capacity int) bloomFilter {
  668. var buf [16]byte
  669. io.ReadFull(rand.Reader, buf[:])
  670. return bloomFilter{
  671. f: blobloom.NewOptimized(blobloom.Config{
  672. Capacity: uint64(capacity),
  673. FPRate: indirectGCBloomFalsePositiveRate,
  674. MaxBits: 8 * indirectGCBloomMaxBytes,
  675. }),
  676. k0: binary.LittleEndian.Uint64(buf[:8]),
  677. k1: binary.LittleEndian.Uint64(buf[8:]),
  678. }
  679. }
  680. type bloomFilter struct {
  681. f *blobloom.Filter
  682. k0, k1 uint64 // Random key for SipHash.
  683. }
  684. func (b *bloomFilter) add(id []byte) { b.f.Add(b.hash(id)) }
  685. func (b *bloomFilter) has(id []byte) bool { return b.f.Has(b.hash(id)) }
  686. // Hash function for the bloomfilter: SipHash of the SHA-256.
  687. //
  688. // The randomization in SipHash means we get different collisions across
  689. // runs and colliding keys are not kept indefinitely.
  690. func (b *bloomFilter) hash(id []byte) uint64 {
  691. if len(id) != sha256.Size {
  692. panic("bug: bloomFilter.hash passed something not a SHA256 hash")
  693. }
  694. return siphash.Hash(b.k0, b.k1, id)
  695. }
  696. // CheckRepair checks folder metadata and sequences for miscellaneous errors.
  697. func (db *Lowlevel) CheckRepair() {
  698. for _, folder := range db.ListFolders() {
  699. _ = db.getMetaAndCheck(folder)
  700. }
  701. }
  702. func (db *Lowlevel) getMetaAndCheck(folder string) *metadataTracker {
  703. db.gcMut.RLock()
  704. defer db.gcMut.RUnlock()
  705. meta, err := db.recalcMeta(folder)
  706. if err == nil {
  707. var fixed int
  708. fixed, err = db.repairSequenceGCLocked(folder, meta)
  709. if fixed != 0 {
  710. l.Infof("Repaired %d sequence entries in database", fixed)
  711. }
  712. }
  713. if backend.IsClosed(err) {
  714. return nil
  715. } else if err != nil {
  716. panic(err)
  717. }
  718. return meta
  719. }
  720. func (db *Lowlevel) loadMetadataTracker(folder string) *metadataTracker {
  721. meta := newMetadataTracker(db.keyer)
  722. if err := meta.fromDB(db, []byte(folder)); err != nil {
  723. if err == errMetaInconsistent {
  724. l.Infof("Stored folder metadata for %q is inconsistent; recalculating", folder)
  725. } else {
  726. l.Infof("No stored folder metadata for %q; recalculating", folder)
  727. }
  728. return db.getMetaAndCheck(folder)
  729. }
  730. curSeq := meta.Sequence(protocol.LocalDeviceID)
  731. if metaOK := db.verifyLocalSequence(curSeq, folder); !metaOK {
  732. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  733. return db.getMetaAndCheck(folder)
  734. }
  735. if age := time.Since(meta.Created()); age > db.recheckInterval {
  736. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, util.NiceDurationString(age))
  737. return db.getMetaAndCheck(folder)
  738. }
  739. return meta
  740. }
  741. func (db *Lowlevel) recalcMeta(folderStr string) (*metadataTracker, error) {
  742. folder := []byte(folderStr)
  743. meta := newMetadataTracker(db.keyer)
  744. if err := db.checkGlobals(folder); err != nil {
  745. return nil, err
  746. }
  747. t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
  748. if err != nil {
  749. return nil, err
  750. }
  751. defer t.close()
  752. var deviceID protocol.DeviceID
  753. err = t.withAllFolderTruncated(folder, func(device []byte, f FileInfoTruncated) bool {
  754. copy(deviceID[:], device)
  755. meta.addFile(deviceID, f)
  756. return true
  757. })
  758. if err != nil {
  759. return nil, err
  760. }
  761. err = t.withGlobal(folder, nil, true, func(f protocol.FileIntf) bool {
  762. meta.addFile(protocol.GlobalDeviceID, f)
  763. return true
  764. })
  765. meta.emptyNeeded(protocol.LocalDeviceID)
  766. err = t.withNeed(folder, protocol.LocalDeviceID[:], true, func(f protocol.FileIntf) bool {
  767. meta.addNeeded(protocol.LocalDeviceID, f)
  768. return true
  769. })
  770. if err != nil {
  771. return nil, err
  772. }
  773. for _, device := range meta.devices() {
  774. meta.emptyNeeded(device)
  775. err = t.withNeed(folder, device[:], true, func(f protocol.FileIntf) bool {
  776. meta.addNeeded(device, f)
  777. return true
  778. })
  779. if err != nil {
  780. return nil, err
  781. }
  782. }
  783. meta.SetCreated()
  784. if err := t.Commit(); err != nil {
  785. return nil, err
  786. }
  787. return meta, nil
  788. }
  789. // Verify the local sequence number from actual sequence entries. Returns
  790. // true if it was all good, or false if a fixup was necessary.
  791. func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) bool {
  792. // Walk the sequence index from the current (supposedly) highest
  793. // sequence number and raise the alarm if we get anything. This recovers
  794. // from the occasion where we have written sequence entries to disk but
  795. // not yet written new metadata to disk.
  796. //
  797. // Note that we can have the same thing happen for remote devices but
  798. // there it's not a problem -- we'll simply advertise a lower sequence
  799. // number than we've actually seen and receive some duplicate updates
  800. // and then be in sync again.
  801. t, err := db.newReadOnlyTransaction()
  802. if err != nil {
  803. panic(err)
  804. }
  805. ok := true
  806. if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi protocol.FileIntf) bool {
  807. ok = false // we got something, which we should not have
  808. return false
  809. }); err != nil && !backend.IsClosed(err) {
  810. panic(err)
  811. }
  812. t.close()
  813. return ok
  814. }
  815. // repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
  816. // match those in the corresponding file entries. It returns the amount of fixed
  817. // entries.
  818. func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
  819. t, err := db.newReadWriteTransaction(meta.CommitHook([]byte(folderStr)))
  820. if err != nil {
  821. return 0, err
  822. }
  823. defer t.close()
  824. fixed := 0
  825. folder := []byte(folderStr)
  826. // First check that every file entry has a matching sequence entry
  827. // (this was previously db schema upgrade to 9).
  828. dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
  829. if err != nil {
  830. return 0, err
  831. }
  832. it, err := t.NewPrefixIterator(dk.WithoutName())
  833. if err != nil {
  834. return 0, err
  835. }
  836. defer it.Release()
  837. var sk sequenceKey
  838. for it.Next() {
  839. intf, err := t.unmarshalTrunc(it.Value(), false)
  840. if err != nil {
  841. return 0, err
  842. }
  843. fi := intf.(protocol.FileInfo)
  844. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  845. return 0, err
  846. }
  847. switch dk, err = t.Get(sk); {
  848. case err != nil:
  849. if !backend.IsNotFound(err) {
  850. return 0, err
  851. }
  852. fallthrough
  853. case !bytes.Equal(it.Key(), dk):
  854. fixed++
  855. fi.Sequence = meta.nextLocalSeq()
  856. if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
  857. return 0, err
  858. }
  859. if err := t.Put(sk, it.Key()); err != nil {
  860. return 0, err
  861. }
  862. if err := t.putFile(it.Key(), fi); err != nil {
  863. return 0, err
  864. }
  865. }
  866. if err := t.Checkpoint(); err != nil {
  867. return 0, err
  868. }
  869. }
  870. if err := it.Error(); err != nil {
  871. return 0, err
  872. }
  873. it.Release()
  874. // Secondly check there's no sequence entries pointing at incorrect things.
  875. sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
  876. if err != nil {
  877. return 0, err
  878. }
  879. it, err = t.NewPrefixIterator(sk.WithoutSequence())
  880. if err != nil {
  881. return 0, err
  882. }
  883. defer it.Release()
  884. for it.Next() {
  885. // Check that the sequence from the key matches the
  886. // sequence in the file.
  887. fi, ok, err := t.getFileTrunc(it.Value(), true)
  888. if err != nil {
  889. return 0, err
  890. }
  891. if ok {
  892. if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
  893. continue
  894. }
  895. }
  896. // Either the file is missing or has a different sequence number
  897. fixed++
  898. if err := t.Delete(it.Key()); err != nil {
  899. return 0, err
  900. }
  901. }
  902. if err := it.Error(); err != nil {
  903. return 0, err
  904. }
  905. it.Release()
  906. return fixed, t.Commit()
  907. }
  908. // unchanged checks if two files are the same and thus don't need to be updated.
  909. // Local flags or the invalid bit might change without the version
  910. // being bumped.
  911. func unchanged(nf, ef protocol.FileIntf) bool {
  912. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid() && ef.FileLocalFlags() == nf.FileLocalFlags()
  913. }