leveldb_dbinstance.go 25 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "encoding/binary"
  10. "os"
  11. "sort"
  12. "strings"
  13. "sync/atomic"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. "github.com/syncthing/syncthing/lib/sync"
  16. "github.com/syndtr/goleveldb/leveldb"
  17. "github.com/syndtr/goleveldb/leveldb/errors"
  18. "github.com/syndtr/goleveldb/leveldb/iterator"
  19. "github.com/syndtr/goleveldb/leveldb/opt"
  20. "github.com/syndtr/goleveldb/leveldb/storage"
  21. "github.com/syndtr/goleveldb/leveldb/util"
  22. )
  23. type deletionHandler func(t readWriteTransaction, folder, device, name []byte, dbi iterator.Iterator)
  24. type Instance struct {
  25. committed int64 // this must be the first attribute in the struct to ensure 64 bit alignment on 32 bit plaforms
  26. *leveldb.DB
  27. location string
  28. folderIdx *smallIndex
  29. deviceIdx *smallIndex
  30. }
  31. const (
  32. keyPrefixLen = 1
  33. keyFolderLen = 4 // indexed
  34. keyDeviceLen = 4 // indexed
  35. keyHashLen = 32
  36. )
  37. func Open(file string) (*Instance, error) {
  38. opts := &opt.Options{
  39. OpenFilesCacheCapacity: 100,
  40. WriteBuffer: 4 << 20,
  41. }
  42. db, err := leveldb.OpenFile(file, opts)
  43. if leveldbIsCorrupted(err) {
  44. db, err = leveldb.RecoverFile(file, opts)
  45. }
  46. if leveldbIsCorrupted(err) {
  47. // The database is corrupted, and we've tried to recover it but it
  48. // didn't work. At this point there isn't much to do beyond dropping
  49. // the database and reindexing...
  50. l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
  51. if err := os.RemoveAll(file); err != nil {
  52. return nil, err
  53. }
  54. db, err = leveldb.OpenFile(file, opts)
  55. }
  56. if err != nil {
  57. return nil, err
  58. }
  59. return newDBInstance(db, file), nil
  60. }
  61. func OpenMemory() *Instance {
  62. db, _ := leveldb.Open(storage.NewMemStorage(), nil)
  63. return newDBInstance(db, "<memory>")
  64. }
  65. func newDBInstance(db *leveldb.DB, location string) *Instance {
  66. i := &Instance{
  67. DB: db,
  68. location: location,
  69. }
  70. i.folderIdx = newSmallIndex(i, []byte{KeyTypeFolderIdx})
  71. i.deviceIdx = newSmallIndex(i, []byte{KeyTypeDeviceIdx})
  72. return i
  73. }
  74. // Committed returns the number of items committed to the database since startup
  75. func (db *Instance) Committed() int64 {
  76. return atomic.LoadInt64(&db.committed)
  77. }
  78. // Location returns the filesystem path where the database is stored
  79. func (db *Instance) Location() string {
  80. return db.location
  81. }
  82. func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) {
  83. t := db.newReadWriteTransaction()
  84. defer t.close()
  85. var fk []byte
  86. for _, f := range fs {
  87. name := []byte(f.Name)
  88. fk = db.deviceKeyInto(fk[:cap(fk)], folder, device, name)
  89. // Get and unmarshal the file entry. If it doesn't exist or can't be
  90. // unmarshalled we'll add it as a new entry.
  91. bs, err := t.Get(fk, nil)
  92. var ef FileInfoTruncated
  93. if err == nil {
  94. err = ef.Unmarshal(bs)
  95. }
  96. // The Invalid flag might change without the version being bumped.
  97. if err == nil && ef.Version.Equal(f.Version) && ef.Invalid == f.Invalid {
  98. continue
  99. }
  100. devID := protocol.DeviceIDFromBytes(device)
  101. if err == nil {
  102. meta.removeFile(devID, ef)
  103. }
  104. meta.addFile(devID, f)
  105. t.insertFile(folder, device, f)
  106. t.updateGlobal(folder, device, f, meta)
  107. // Write out and reuse the batch every few records, to avoid the batch
  108. // growing too large and thus allocating unnecessarily much memory.
  109. t.checkFlush()
  110. }
  111. }
  112. func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
  113. t := db.newReadOnlyTransaction()
  114. defer t.close()
  115. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, prefix)[:keyPrefixLen+keyFolderLen+keyDeviceLen+len(prefix)]), nil)
  116. defer dbi.Release()
  117. slashedPrefix := prefix
  118. if !bytes.HasSuffix(prefix, []byte{'/'}) {
  119. slashedPrefix = append(slashedPrefix, '/')
  120. }
  121. for dbi.Next() {
  122. name := db.deviceKeyName(dbi.Key())
  123. if len(prefix) > 0 && !bytes.Equal(name, prefix) && !bytes.HasPrefix(name, slashedPrefix) {
  124. return
  125. }
  126. // The iterator function may keep a reference to the unmarshalled
  127. // struct, which in turn references the buffer it was unmarshalled
  128. // from. dbi.Value() just returns an internal slice that it reuses, so
  129. // we need to copy it.
  130. f, err := unmarshalTrunc(append([]byte{}, dbi.Value()...), truncate)
  131. if err != nil {
  132. l.Debugln("unmarshal error:", err)
  133. continue
  134. }
  135. if cont := fn(f); !cont {
  136. return
  137. }
  138. }
  139. }
  140. func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
  141. t := db.newReadWriteTransaction()
  142. defer t.close()
  143. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, nil, nil)[:keyPrefixLen+keyFolderLen]), nil)
  144. defer dbi.Release()
  145. for dbi.Next() {
  146. device := db.deviceKeyDevice(dbi.Key())
  147. var f FileInfoTruncated
  148. // The iterator function may keep a reference to the unmarshalled
  149. // struct, which in turn references the buffer it was unmarshalled
  150. // from. dbi.Value() just returns an internal slice that it reuses, so
  151. // we need to copy it.
  152. err := f.Unmarshal(append([]byte{}, dbi.Value()...))
  153. if err != nil {
  154. l.Debugln("unmarshal error:", err)
  155. continue
  156. }
  157. switch f.Name {
  158. case "", ".", "..", "/": // A few obviously invalid filenames
  159. l.Infof("Dropping invalid filename %q from database", f.Name)
  160. t.removeFromGlobal(folder, device, nil, nil)
  161. t.Delete(dbi.Key())
  162. t.checkFlush()
  163. continue
  164. }
  165. if cont := fn(device, f); !cont {
  166. return
  167. }
  168. }
  169. }
  170. func (db *Instance) getFile(folder, device, file []byte) (protocol.FileInfo, bool) {
  171. return getFile(db, db.deviceKey(folder, device, file))
  172. }
  173. func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
  174. k := db.globalKey(folder, file)
  175. t := db.newReadOnlyTransaction()
  176. defer t.close()
  177. bs, err := t.Get(k, nil)
  178. if err != nil {
  179. return nil, false
  180. }
  181. var vl VersionList
  182. err = vl.Unmarshal(bs)
  183. if err == leveldb.ErrNotFound {
  184. return nil, false
  185. }
  186. if err != nil {
  187. l.Debugln("unmarshal error:", k, err)
  188. return nil, false
  189. }
  190. if len(vl.Versions) == 0 {
  191. l.Debugln("no versions:", k)
  192. return nil, false
  193. }
  194. k = db.deviceKey(folder, vl.Versions[0].Device, file)
  195. bs, err = t.Get(k, nil)
  196. if err != nil {
  197. l.Debugln("surprise error:", k, err)
  198. return nil, false
  199. }
  200. fi, err := unmarshalTrunc(bs, truncate)
  201. if err != nil {
  202. l.Debugln("unmarshal error:", k, err)
  203. return nil, false
  204. }
  205. return fi, true
  206. }
  207. func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
  208. t := db.newReadOnlyTransaction()
  209. defer t.close()
  210. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, prefix)), nil)
  211. defer dbi.Release()
  212. slashedPrefix := prefix
  213. if !bytes.HasSuffix(prefix, []byte{'/'}) {
  214. slashedPrefix = append(slashedPrefix, '/')
  215. }
  216. var fk []byte
  217. for dbi.Next() {
  218. var vl VersionList
  219. err := vl.Unmarshal(dbi.Value())
  220. if err != nil {
  221. l.Debugln("unmarshal error:", err)
  222. continue
  223. }
  224. if len(vl.Versions) == 0 {
  225. l.Debugln("no versions:", dbi.Key())
  226. continue
  227. }
  228. name := db.globalKeyName(dbi.Key())
  229. if len(prefix) > 0 && !bytes.Equal(name, prefix) && !bytes.HasPrefix(name, slashedPrefix) {
  230. return
  231. }
  232. fk = db.deviceKeyInto(fk[:cap(fk)], folder, vl.Versions[0].Device, name)
  233. bs, err := t.Get(fk, nil)
  234. if err != nil {
  235. l.Debugln("surprise error:", err)
  236. continue
  237. }
  238. f, err := unmarshalTrunc(bs, truncate)
  239. if err != nil {
  240. l.Debugln("unmarshal error:", err)
  241. continue
  242. }
  243. if cont := fn(f); !cont {
  244. return
  245. }
  246. }
  247. }
  248. func (db *Instance) availability(folder, file []byte) []protocol.DeviceID {
  249. k := db.globalKey(folder, file)
  250. bs, err := db.Get(k, nil)
  251. if err == leveldb.ErrNotFound {
  252. return nil
  253. }
  254. if err != nil {
  255. l.Debugln("surprise error:", err)
  256. return nil
  257. }
  258. var vl VersionList
  259. err = vl.Unmarshal(bs)
  260. if err != nil {
  261. l.Debugln("unmarshal error:", err)
  262. return nil
  263. }
  264. var devices []protocol.DeviceID
  265. for _, v := range vl.Versions {
  266. if !v.Version.Equal(vl.Versions[0].Version) {
  267. break
  268. }
  269. if v.Invalid {
  270. continue
  271. }
  272. n := protocol.DeviceIDFromBytes(v.Device)
  273. devices = append(devices, n)
  274. }
  275. return devices
  276. }
  277. func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
  278. t := db.newReadOnlyTransaction()
  279. defer t.close()
  280. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  281. defer dbi.Release()
  282. var fk []byte
  283. for dbi.Next() {
  284. var vl VersionList
  285. err := vl.Unmarshal(dbi.Value())
  286. if err != nil {
  287. l.Debugln("unmarshal error:", err)
  288. continue
  289. }
  290. if len(vl.Versions) == 0 {
  291. l.Debugln("no versions:", dbi.Key())
  292. continue
  293. }
  294. have := false // If we have the file, any version
  295. need := false // If we have a lower version of the file
  296. var haveFileVersion FileVersion
  297. for _, v := range vl.Versions {
  298. if bytes.Equal(v.Device, device) {
  299. have = true
  300. haveFileVersion = v
  301. // XXX: This marks Concurrent (i.e. conflicting) changes as
  302. // needs. Maybe we should do that, but it needs special
  303. // handling in the puller.
  304. need = !v.Version.GreaterEqual(vl.Versions[0].Version)
  305. break
  306. }
  307. }
  308. if have && !need {
  309. continue
  310. }
  311. name := db.globalKeyName(dbi.Key())
  312. needVersion := vl.Versions[0].Version
  313. needDevice := protocol.DeviceIDFromBytes(vl.Versions[0].Device)
  314. for i := range vl.Versions {
  315. if !vl.Versions[i].Version.Equal(needVersion) {
  316. // We haven't found a valid copy of the file with the needed version.
  317. break
  318. }
  319. if vl.Versions[i].Invalid {
  320. // The file is marked invalid, don't use it.
  321. continue
  322. }
  323. fk = db.deviceKeyInto(fk[:cap(fk)], folder, vl.Versions[i].Device, name)
  324. bs, err := t.Get(fk, nil)
  325. if err != nil {
  326. l.Debugln("surprise error:", err)
  327. continue
  328. }
  329. gf, err := unmarshalTrunc(bs, truncate)
  330. if err != nil {
  331. l.Debugln("unmarshal error:", err)
  332. continue
  333. }
  334. if gf.IsDeleted() && !have {
  335. // We don't need deleted files that we don't have
  336. break
  337. }
  338. l.Debugf("need folder=%q device=%v name=%q need=%v have=%v invalid=%v haveV=%v globalV=%v globalDev=%v", folder, protocol.DeviceIDFromBytes(device), name, need, have, haveFileVersion.Invalid, haveFileVersion.Version, needVersion, needDevice)
  339. if cont := fn(gf); !cont {
  340. return
  341. }
  342. // This file is handled, no need to look further in the version list
  343. break
  344. }
  345. }
  346. }
  347. func (db *Instance) ListFolders() []string {
  348. t := db.newReadOnlyTransaction()
  349. defer t.close()
  350. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
  351. defer dbi.Release()
  352. folderExists := make(map[string]bool)
  353. for dbi.Next() {
  354. folder, ok := db.globalKeyFolder(dbi.Key())
  355. if ok && !folderExists[string(folder)] {
  356. folderExists[string(folder)] = true
  357. }
  358. }
  359. folders := make([]string, 0, len(folderExists))
  360. for k := range folderExists {
  361. folders = append(folders, k)
  362. }
  363. sort.Strings(folders)
  364. return folders
  365. }
  366. func (db *Instance) dropFolder(folder []byte) {
  367. t := db.newReadOnlyTransaction()
  368. defer t.close()
  369. // Remove all items related to the given folder from the device->file bucket
  370. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeDevice}), nil)
  371. for dbi.Next() {
  372. itemFolder := db.deviceKeyFolder(dbi.Key())
  373. if bytes.Equal(folder, itemFolder) {
  374. db.Delete(dbi.Key(), nil)
  375. }
  376. }
  377. dbi.Release()
  378. // Remove all items related to the given folder from the global bucket
  379. dbi = t.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
  380. for dbi.Next() {
  381. itemFolder, ok := db.globalKeyFolder(dbi.Key())
  382. if ok && bytes.Equal(folder, itemFolder) {
  383. db.Delete(dbi.Key(), nil)
  384. }
  385. }
  386. dbi.Release()
  387. }
  388. func (db *Instance) dropDeviceFolder(device, folder []byte, meta *metadataTracker) {
  389. t := db.newReadWriteTransaction()
  390. defer t.close()
  391. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, nil)), nil)
  392. defer dbi.Release()
  393. for dbi.Next() {
  394. key := dbi.Key()
  395. name := db.deviceKeyName(key)
  396. t.removeFromGlobal(folder, device, name, meta)
  397. t.Delete(key)
  398. t.checkFlush()
  399. }
  400. }
  401. func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) {
  402. t := db.newReadWriteTransaction()
  403. defer t.close()
  404. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  405. defer dbi.Release()
  406. var fk []byte
  407. for dbi.Next() {
  408. gk := dbi.Key()
  409. var vl VersionList
  410. err := vl.Unmarshal(dbi.Value())
  411. if err != nil {
  412. l.Debugln("unmarshal error:", err)
  413. continue
  414. }
  415. // Check the global version list for consistency. An issue in previous
  416. // versions of goleveldb could result in reordered writes so that
  417. // there are global entries pointing to no longer existing files. Here
  418. // we find those and clear them out.
  419. name := db.globalKeyName(gk)
  420. var newVL VersionList
  421. for i, version := range vl.Versions {
  422. fk = db.deviceKeyInto(fk[:cap(fk)], folder, version.Device, name)
  423. _, err := t.Get(fk, nil)
  424. if err == leveldb.ErrNotFound {
  425. continue
  426. }
  427. if err != nil {
  428. l.Debugln("surprise error:", err)
  429. return
  430. }
  431. newVL.Versions = append(newVL.Versions, version)
  432. if i == 0 {
  433. if fi, ok := t.getFile(folder, version.Device, name); ok {
  434. meta.addFile(globalDeviceID, fi)
  435. }
  436. }
  437. }
  438. if len(newVL.Versions) != len(vl.Versions) {
  439. t.Put(dbi.Key(), mustMarshal(&newVL))
  440. t.checkFlush()
  441. }
  442. }
  443. l.Debugf("db check completed for %q", folder)
  444. }
  445. // ConvertSymlinkTypes should be run once only on an old database. It
  446. // changes SYMLINK_FILE and SYMLINK_DIRECTORY types to the current SYMLINK
  447. // type (previously SYMLINK_UNKNOWN). It does this for all devices, both
  448. // local and remote, and does not reset delta indexes. It shouldn't really
  449. // matter what the symlink type is, but this cleans it up for a possible
  450. // future when SYMLINK_FILE and SYMLINK_DIRECTORY are no longer understood.
  451. func (db *Instance) ConvertSymlinkTypes() {
  452. t := db.newReadWriteTransaction()
  453. defer t.close()
  454. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeDevice}), nil)
  455. defer dbi.Release()
  456. conv := 0
  457. for dbi.Next() {
  458. var f protocol.FileInfo
  459. if err := f.Unmarshal(dbi.Value()); err != nil {
  460. // probably can't happen
  461. continue
  462. }
  463. if f.Type == protocol.FileInfoTypeDeprecatedSymlinkDirectory || f.Type == protocol.FileInfoTypeDeprecatedSymlinkFile {
  464. f.Type = protocol.FileInfoTypeSymlink
  465. bs, err := f.Marshal()
  466. if err != nil {
  467. panic("can't happen: " + err.Error())
  468. }
  469. t.Put(dbi.Key(), bs)
  470. t.checkFlush()
  471. conv++
  472. }
  473. }
  474. l.Infof("Updated symlink type for %d index entries", conv)
  475. }
  476. // AddInvalidToGlobal searches for invalid files and adds them to the global list.
  477. // Invalid files exist in the db if they once were not ignored and subsequently
  478. // ignored. In the new system this is still valid, but invalid files must also be
  479. // in the global list such that they cannot be mistaken for missing files.
  480. func (db *Instance) AddInvalidToGlobal(folder, device []byte) int {
  481. t := db.newReadWriteTransaction()
  482. defer t.close()
  483. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, nil)[:keyPrefixLen+keyFolderLen+keyDeviceLen]), nil)
  484. defer dbi.Release()
  485. changed := 0
  486. for dbi.Next() {
  487. var file protocol.FileInfo
  488. if err := file.Unmarshal(dbi.Value()); err != nil {
  489. // probably can't happen
  490. continue
  491. }
  492. if file.Invalid {
  493. changed++
  494. l.Debugf("add invalid to global; folder=%q device=%v file=%q version=%v", folder, protocol.DeviceIDFromBytes(device), file.Name, file.Version)
  495. // this is an adapted version of readWriteTransaction.updateGlobal
  496. name := []byte(file.Name)
  497. gk := t.db.globalKey(folder, name)
  498. var fl VersionList
  499. if svl, err := t.Get(gk, nil); err == nil {
  500. fl.Unmarshal(svl) // skip error, range handles success case
  501. }
  502. nv := FileVersion{
  503. Device: device,
  504. Version: file.Version,
  505. Invalid: file.Invalid,
  506. }
  507. inserted := false
  508. // Find a position in the list to insert this file. The file at the front
  509. // of the list is the newer, the "global".
  510. insert:
  511. for i := range fl.Versions {
  512. switch fl.Versions[i].Version.Compare(file.Version) {
  513. case protocol.Equal:
  514. // Invalid files should go after a valid file of equal version
  515. if nv.Invalid {
  516. continue insert
  517. }
  518. fallthrough
  519. case protocol.Lesser:
  520. // The version at this point in the list is equal to or lesser
  521. // ("older") than us. We insert ourselves in front of it.
  522. fl.Versions = insertVersion(fl.Versions, i, nv)
  523. inserted = true
  524. break insert
  525. case protocol.ConcurrentLesser, protocol.ConcurrentGreater:
  526. // The version at this point is in conflict with us. We must pull
  527. // the actual file metadata to determine who wins. If we win, we
  528. // insert ourselves in front of the loser here. (The "Lesser" and
  529. // "Greater" in the condition above is just based on the device
  530. // IDs in the version vector, which is not the only thing we use
  531. // to determine the winner.)
  532. //
  533. // A surprise missing file entry here is counted as a win for us.
  534. of, ok := t.getFile(folder, fl.Versions[i].Device, name)
  535. if !ok || file.WinsConflict(of) {
  536. fl.Versions = insertVersion(fl.Versions, i, nv)
  537. inserted = true
  538. break insert
  539. }
  540. }
  541. }
  542. if !inserted {
  543. // We didn't find a position for an insert above, so append to the end.
  544. fl.Versions = append(fl.Versions, nv)
  545. }
  546. t.Put(gk, mustMarshal(&fl))
  547. }
  548. }
  549. return changed
  550. }
  551. // deviceKey returns a byte slice encoding the following information:
  552. // keyTypeDevice (1 byte)
  553. // folder (4 bytes)
  554. // device (4 bytes)
  555. // name (variable size)
  556. func (db *Instance) deviceKey(folder, device, file []byte) []byte {
  557. return db.deviceKeyInto(nil, folder, device, file)
  558. }
  559. func (db *Instance) deviceKeyInto(k []byte, folder, device, file []byte) []byte {
  560. reqLen := keyPrefixLen + keyFolderLen + keyDeviceLen + len(file)
  561. if len(k) < reqLen {
  562. k = make([]byte, reqLen)
  563. }
  564. k[0] = KeyTypeDevice
  565. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
  566. binary.BigEndian.PutUint32(k[keyPrefixLen+keyFolderLen:], db.deviceIdx.ID(device))
  567. copy(k[keyPrefixLen+keyFolderLen+keyDeviceLen:], file)
  568. return k[:reqLen]
  569. }
  570. // deviceKeyName returns the device ID from the key
  571. func (db *Instance) deviceKeyName(key []byte) []byte {
  572. return key[keyPrefixLen+keyFolderLen+keyDeviceLen:]
  573. }
  574. // deviceKeyFolder returns the folder name from the key
  575. func (db *Instance) deviceKeyFolder(key []byte) []byte {
  576. folder, ok := db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  577. if !ok {
  578. panic("bug: lookup of nonexistent folder ID")
  579. }
  580. return folder
  581. }
  582. // deviceKeyDevice returns the device ID from the key
  583. func (db *Instance) deviceKeyDevice(key []byte) []byte {
  584. device, ok := db.deviceIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen+keyFolderLen:]))
  585. if !ok {
  586. panic("bug: lookup of nonexistent device ID")
  587. }
  588. return device
  589. }
  590. // globalKey returns a byte slice encoding the following information:
  591. // keyTypeGlobal (1 byte)
  592. // folder (4 bytes)
  593. // name (variable size)
  594. func (db *Instance) globalKey(folder, file []byte) []byte {
  595. k := make([]byte, keyPrefixLen+keyFolderLen+len(file))
  596. k[0] = KeyTypeGlobal
  597. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
  598. copy(k[keyPrefixLen+keyFolderLen:], file)
  599. return k
  600. }
  601. // globalKeyName returns the filename from the key
  602. func (db *Instance) globalKeyName(key []byte) []byte {
  603. return key[keyPrefixLen+keyFolderLen:]
  604. }
  605. // globalKeyFolder returns the folder name from the key
  606. func (db *Instance) globalKeyFolder(key []byte) ([]byte, bool) {
  607. return db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  608. }
  609. func (db *Instance) getIndexID(device, folder []byte) protocol.IndexID {
  610. key := db.indexIDKey(device, folder)
  611. cur, err := db.Get(key, nil)
  612. if err != nil {
  613. return 0
  614. }
  615. var id protocol.IndexID
  616. if err := id.Unmarshal(cur); err != nil {
  617. return 0
  618. }
  619. return id
  620. }
  621. func (db *Instance) setIndexID(device, folder []byte, id protocol.IndexID) {
  622. key := db.indexIDKey(device, folder)
  623. bs, _ := id.Marshal() // marshalling can't fail
  624. if err := db.Put(key, bs, nil); err != nil {
  625. panic("storing index ID: " + err.Error())
  626. }
  627. }
  628. func (db *Instance) indexIDKey(device, folder []byte) []byte {
  629. k := make([]byte, keyPrefixLen+keyDeviceLen+keyFolderLen)
  630. k[0] = KeyTypeIndexID
  631. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.deviceIdx.ID(device))
  632. binary.BigEndian.PutUint32(k[keyPrefixLen+keyDeviceLen:], db.folderIdx.ID(folder))
  633. return k
  634. }
  635. func (db *Instance) indexIDDevice(key []byte) []byte {
  636. device, ok := db.deviceIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  637. if !ok {
  638. // uuh ...
  639. return nil
  640. }
  641. return device
  642. }
  643. func (db *Instance) mtimesKey(folder []byte) []byte {
  644. prefix := make([]byte, 5) // key type + 4 bytes folder idx number
  645. prefix[0] = KeyTypeVirtualMtime
  646. binary.BigEndian.PutUint32(prefix[1:], db.folderIdx.ID(folder))
  647. return prefix
  648. }
  649. func (db *Instance) folderMetaKey(folder []byte) []byte {
  650. prefix := make([]byte, 5) // key type + 4 bytes folder idx number
  651. prefix[0] = KeyTypeFolderMeta
  652. binary.BigEndian.PutUint32(prefix[1:], db.folderIdx.ID(folder))
  653. return prefix
  654. }
  655. // DropLocalDeltaIndexIDs removes all index IDs for the local device ID from
  656. // the database. This will cause a full index transmission on the next
  657. // connection.
  658. func (db *Instance) DropLocalDeltaIndexIDs() {
  659. db.dropDeltaIndexIDs(true)
  660. }
  661. // DropRemoteDeltaIndexIDs removes all index IDs for the other devices than
  662. // the local one from the database. This will cause them to send us a full
  663. // index on the next connection.
  664. func (db *Instance) DropRemoteDeltaIndexIDs() {
  665. db.dropDeltaIndexIDs(false)
  666. }
  667. func (db *Instance) dropDeltaIndexIDs(local bool) {
  668. t := db.newReadWriteTransaction()
  669. defer t.close()
  670. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeIndexID}), nil)
  671. defer dbi.Release()
  672. for dbi.Next() {
  673. device := db.indexIDDevice(dbi.Key())
  674. if bytes.Equal(device, protocol.LocalDeviceID[:]) == local {
  675. t.Delete(dbi.Key())
  676. }
  677. }
  678. }
  679. func (db *Instance) dropMtimes(folder []byte) {
  680. db.dropPrefix(db.mtimesKey(folder))
  681. }
  682. func (db *Instance) dropFolderMeta(folder []byte) {
  683. db.dropPrefix(db.folderMetaKey(folder))
  684. }
  685. func (db *Instance) dropPrefix(prefix []byte) {
  686. t := db.newReadWriteTransaction()
  687. defer t.close()
  688. dbi := t.NewIterator(util.BytesPrefix(prefix), nil)
  689. defer dbi.Release()
  690. for dbi.Next() {
  691. t.Delete(dbi.Key())
  692. }
  693. }
  694. func unmarshalTrunc(bs []byte, truncate bool) (FileIntf, error) {
  695. if truncate {
  696. var tf FileInfoTruncated
  697. err := tf.Unmarshal(bs)
  698. return tf, err
  699. }
  700. var tf protocol.FileInfo
  701. err := tf.Unmarshal(bs)
  702. return tf, err
  703. }
  704. // A "better" version of leveldb's errors.IsCorrupted.
  705. func leveldbIsCorrupted(err error) bool {
  706. switch {
  707. case err == nil:
  708. return false
  709. case errors.IsCorrupted(err):
  710. return true
  711. case strings.Contains(err.Error(), "corrupted"):
  712. return true
  713. }
  714. return false
  715. }
  716. // A smallIndex is an in memory bidirectional []byte to uint32 map. It gives
  717. // fast lookups in both directions and persists to the database. Don't use for
  718. // storing more items than fit comfortably in RAM.
  719. type smallIndex struct {
  720. db *Instance
  721. prefix []byte
  722. id2val map[uint32]string
  723. val2id map[string]uint32
  724. nextID uint32
  725. mut sync.Mutex
  726. }
  727. func newSmallIndex(db *Instance, prefix []byte) *smallIndex {
  728. idx := &smallIndex{
  729. db: db,
  730. prefix: prefix,
  731. id2val: make(map[uint32]string),
  732. val2id: make(map[string]uint32),
  733. mut: sync.NewMutex(),
  734. }
  735. idx.load()
  736. return idx
  737. }
  738. // load iterates over the prefix space in the database and populates the in
  739. // memory maps.
  740. func (i *smallIndex) load() {
  741. tr := i.db.newReadOnlyTransaction()
  742. it := tr.NewIterator(util.BytesPrefix(i.prefix), nil)
  743. for it.Next() {
  744. val := string(it.Value())
  745. id := binary.BigEndian.Uint32(it.Key()[len(i.prefix):])
  746. i.id2val[id] = val
  747. i.val2id[val] = id
  748. if id >= i.nextID {
  749. i.nextID = id + 1
  750. }
  751. }
  752. it.Release()
  753. tr.close()
  754. }
  755. // ID returns the index number for the given byte slice, allocating a new one
  756. // and persisting this to the database if necessary.
  757. func (i *smallIndex) ID(val []byte) uint32 {
  758. i.mut.Lock()
  759. // intentionally avoiding defer here as we want this call to be as fast as
  760. // possible in the general case (folder ID already exists). The map lookup
  761. // with the conversion of []byte to string is compiler optimized to not
  762. // copy the []byte, which is why we don't assign it to a temp variable
  763. // here.
  764. if id, ok := i.val2id[string(val)]; ok {
  765. i.mut.Unlock()
  766. return id
  767. }
  768. id := i.nextID
  769. i.nextID++
  770. valStr := string(val)
  771. i.val2id[valStr] = id
  772. i.id2val[id] = valStr
  773. key := make([]byte, len(i.prefix)+8) // prefix plus uint32 id
  774. copy(key, i.prefix)
  775. binary.BigEndian.PutUint32(key[len(i.prefix):], id)
  776. i.db.Put(key, val, nil)
  777. i.mut.Unlock()
  778. return id
  779. }
  780. // Val returns the value for the given index number, or (nil, false) if there
  781. // is no such index number.
  782. func (i *smallIndex) Val(id uint32) ([]byte, bool) {
  783. i.mut.Lock()
  784. val, ok := i.id2val[id]
  785. i.mut.Unlock()
  786. if !ok {
  787. return nil, false
  788. }
  789. return []byte(val), true
  790. }