leveldb_dbinstance.go 25 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "encoding/binary"
  10. "fmt"
  11. "os"
  12. "sort"
  13. "strings"
  14. "sync/atomic"
  15. "github.com/syncthing/syncthing/lib/protocol"
  16. "github.com/syncthing/syncthing/lib/sync"
  17. "github.com/syndtr/goleveldb/leveldb"
  18. "github.com/syndtr/goleveldb/leveldb/errors"
  19. "github.com/syndtr/goleveldb/leveldb/iterator"
  20. "github.com/syndtr/goleveldb/leveldb/opt"
  21. "github.com/syndtr/goleveldb/leveldb/storage"
  22. "github.com/syndtr/goleveldb/leveldb/util"
  23. )
  24. type deletionHandler func(t readWriteTransaction, folder, device, name []byte, dbi iterator.Iterator)
  25. type Instance struct {
  26. committed int64 // this must be the first attribute in the struct to ensure 64 bit alignment on 32 bit plaforms
  27. *leveldb.DB
  28. location string
  29. folderIdx *smallIndex
  30. deviceIdx *smallIndex
  31. }
  32. const (
  33. keyPrefixLen = 1
  34. keyFolderLen = 4 // indexed
  35. keyDeviceLen = 4 // indexed
  36. keySequenceLen = 8
  37. keyHashLen = 32
  38. maxInt64 int64 = 1<<63 - 1
  39. )
  40. func Open(file string) (*Instance, error) {
  41. opts := &opt.Options{
  42. OpenFilesCacheCapacity: 100,
  43. WriteBuffer: 4 << 20,
  44. }
  45. db, err := leveldb.OpenFile(file, opts)
  46. if leveldbIsCorrupted(err) {
  47. db, err = leveldb.RecoverFile(file, opts)
  48. }
  49. if leveldbIsCorrupted(err) {
  50. // The database is corrupted, and we've tried to recover it but it
  51. // didn't work. At this point there isn't much to do beyond dropping
  52. // the database and reindexing...
  53. l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
  54. if err := os.RemoveAll(file); err != nil {
  55. return nil, errorSuggestion{err, "failed to delete corrupted database"}
  56. }
  57. db, err = leveldb.OpenFile(file, opts)
  58. }
  59. if err != nil {
  60. return nil, errorSuggestion{err, "is another instance of Syncthing running?"}
  61. }
  62. return newDBInstance(db, file)
  63. }
  64. func OpenMemory() *Instance {
  65. db, _ := leveldb.Open(storage.NewMemStorage(), nil)
  66. ldb, _ := newDBInstance(db, "<memory>")
  67. return ldb
  68. }
  69. func newDBInstance(db *leveldb.DB, location string) (*Instance, error) {
  70. i := &Instance{
  71. DB: db,
  72. location: location,
  73. }
  74. i.folderIdx = newSmallIndex(i, []byte{KeyTypeFolderIdx})
  75. i.deviceIdx = newSmallIndex(i, []byte{KeyTypeDeviceIdx})
  76. err := i.updateSchema()
  77. return i, err
  78. }
  79. // Committed returns the number of items committed to the database since startup
  80. func (db *Instance) Committed() int64 {
  81. return atomic.LoadInt64(&db.committed)
  82. }
  83. // Location returns the filesystem path where the database is stored
  84. func (db *Instance) Location() string {
  85. return db.location
  86. }
  87. func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) {
  88. t := db.newReadWriteTransaction()
  89. defer t.close()
  90. var fk []byte
  91. var gk []byte
  92. for _, f := range fs {
  93. name := []byte(f.Name)
  94. fk = db.deviceKeyInto(fk, folder, device, name)
  95. // Get and unmarshal the file entry. If it doesn't exist or can't be
  96. // unmarshalled we'll add it as a new entry.
  97. bs, err := t.Get(fk, nil)
  98. var ef FileInfoTruncated
  99. if err == nil {
  100. err = ef.Unmarshal(bs)
  101. }
  102. // Local flags or the invalid bit might change without the version
  103. // being bumped. The IsInvalid() method handles both.
  104. if err == nil && ef.Version.Equal(f.Version) && ef.IsInvalid() == f.IsInvalid() {
  105. continue
  106. }
  107. devID := protocol.DeviceIDFromBytes(device)
  108. if err == nil {
  109. meta.removeFile(devID, ef)
  110. }
  111. meta.addFile(devID, f)
  112. t.insertFile(fk, folder, device, f)
  113. gk = db.globalKeyInto(gk, folder, name)
  114. t.updateGlobal(gk, folder, device, f, meta)
  115. // Write out and reuse the batch every few records, to avoid the batch
  116. // growing too large and thus allocating unnecessarily much memory.
  117. t.checkFlush()
  118. }
  119. }
  120. func (db *Instance) addSequences(folder []byte, fs []protocol.FileInfo) {
  121. t := db.newReadWriteTransaction()
  122. defer t.close()
  123. var sk []byte
  124. var dk []byte
  125. for _, f := range fs {
  126. sk = db.sequenceKeyInto(sk, folder, f.Sequence)
  127. dk = db.deviceKeyInto(dk, folder, protocol.LocalDeviceID[:], []byte(f.Name))
  128. t.Put(sk, dk)
  129. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  130. t.checkFlush()
  131. }
  132. }
  133. func (db *Instance) removeSequences(folder []byte, fs []protocol.FileInfo) {
  134. t := db.newReadWriteTransaction()
  135. defer t.close()
  136. var sk []byte
  137. for _, f := range fs {
  138. t.Delete(db.sequenceKeyInto(sk, folder, f.Sequence))
  139. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  140. t.checkFlush()
  141. }
  142. }
  143. func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
  144. if len(prefix) > 0 {
  145. unslashedPrefix := prefix
  146. if bytes.HasSuffix(prefix, []byte{'/'}) {
  147. unslashedPrefix = unslashedPrefix[:len(unslashedPrefix)-1]
  148. } else {
  149. prefix = append(prefix, '/')
  150. }
  151. if f, ok := db.getFileTrunc(db.deviceKey(folder, device, unslashedPrefix), true); ok && !fn(f) {
  152. return
  153. }
  154. }
  155. t := db.newReadOnlyTransaction()
  156. defer t.close()
  157. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, prefix)[:keyPrefixLen+keyFolderLen+keyDeviceLen+len(prefix)]), nil)
  158. defer dbi.Release()
  159. for dbi.Next() {
  160. name := db.deviceKeyName(dbi.Key())
  161. if len(prefix) > 0 && !bytes.HasPrefix(name, prefix) {
  162. return
  163. }
  164. // The iterator function may keep a reference to the unmarshalled
  165. // struct, which in turn references the buffer it was unmarshalled
  166. // from. dbi.Value() just returns an internal slice that it reuses, so
  167. // we need to copy it.
  168. f, err := unmarshalTrunc(append([]byte{}, dbi.Value()...), truncate)
  169. if err != nil {
  170. l.Debugln("unmarshal error:", err)
  171. continue
  172. }
  173. if !fn(f) {
  174. return
  175. }
  176. }
  177. }
  178. func (db *Instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator) {
  179. t := db.newReadOnlyTransaction()
  180. defer t.close()
  181. dbi := t.NewIterator(&util.Range{Start: db.sequenceKey(folder, startSeq), Limit: db.sequenceKey(folder, maxInt64)}, nil)
  182. defer dbi.Release()
  183. for dbi.Next() {
  184. f, ok := db.getFile(dbi.Value())
  185. if !ok {
  186. l.Debugln("missing file for sequence number", db.sequenceKeySequence(dbi.Key()))
  187. continue
  188. }
  189. if shouldDebug() {
  190. key := dbi.Key()
  191. seq := int64(binary.BigEndian.Uint64(key[keyPrefixLen+keyFolderLen:]))
  192. if f.Sequence != seq {
  193. panic(fmt.Sprintf("sequence index corruption, file sequence %d != expected %d", f.Sequence, seq))
  194. }
  195. }
  196. if !fn(f) {
  197. return
  198. }
  199. }
  200. }
  201. func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
  202. t := db.newReadWriteTransaction()
  203. defer t.close()
  204. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, nil, nil)[:keyPrefixLen+keyFolderLen]), nil)
  205. defer dbi.Release()
  206. var gk []byte
  207. for dbi.Next() {
  208. device := db.deviceKeyDevice(dbi.Key())
  209. var f FileInfoTruncated
  210. // The iterator function may keep a reference to the unmarshalled
  211. // struct, which in turn references the buffer it was unmarshalled
  212. // from. dbi.Value() just returns an internal slice that it reuses, so
  213. // we need to copy it.
  214. err := f.Unmarshal(append([]byte{}, dbi.Value()...))
  215. if err != nil {
  216. l.Debugln("unmarshal error:", err)
  217. continue
  218. }
  219. switch f.Name {
  220. case "", ".", "..", "/": // A few obviously invalid filenames
  221. l.Infof("Dropping invalid filename %q from database", f.Name)
  222. name := []byte(f.Name)
  223. gk = db.globalKeyInto(gk, folder, name)
  224. t.removeFromGlobal(gk, folder, device, name, nil)
  225. t.Delete(dbi.Key())
  226. t.checkFlush()
  227. continue
  228. }
  229. if !fn(device, f) {
  230. return
  231. }
  232. }
  233. }
  234. func (db *Instance) getFile(key []byte) (protocol.FileInfo, bool) {
  235. if f, ok := db.getFileTrunc(key, false); ok {
  236. return f.(protocol.FileInfo), true
  237. }
  238. return protocol.FileInfo{}, false
  239. }
  240. func (db *Instance) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) {
  241. bs, err := db.Get(key, nil)
  242. if err == leveldb.ErrNotFound {
  243. return nil, false
  244. }
  245. if err != nil {
  246. l.Debugln("surprise error:", err)
  247. return nil, false
  248. }
  249. f, err := unmarshalTrunc(bs, trunc)
  250. if err != nil {
  251. l.Debugln("unmarshal error:", err)
  252. return nil, false
  253. }
  254. return f, true
  255. }
  256. func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
  257. t := db.newReadOnlyTransaction()
  258. defer t.close()
  259. _, _, f, ok := db.getGlobalInto(t, nil, nil, folder, file, truncate)
  260. return f, ok
  261. }
  262. func (db *Instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file []byte, truncate bool) ([]byte, []byte, FileIntf, bool) {
  263. gk = db.globalKeyInto(gk, folder, file)
  264. bs, err := t.Get(gk, nil)
  265. if err != nil {
  266. return gk, dk, nil, false
  267. }
  268. vl, ok := unmarshalVersionList(bs)
  269. if !ok {
  270. return gk, dk, nil, false
  271. }
  272. dk = db.deviceKeyInto(dk, folder, vl.Versions[0].Device, file)
  273. if fi, ok := db.getFileTrunc(dk, truncate); ok {
  274. return gk, dk, fi, true
  275. }
  276. return gk, dk, nil, false
  277. }
  278. func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
  279. if len(prefix) > 0 {
  280. unslashedPrefix := prefix
  281. if bytes.HasSuffix(prefix, []byte{'/'}) {
  282. unslashedPrefix = unslashedPrefix[:len(unslashedPrefix)-1]
  283. } else {
  284. prefix = append(prefix, '/')
  285. }
  286. if f, ok := db.getGlobal(folder, unslashedPrefix, truncate); ok && !fn(f) {
  287. return
  288. }
  289. }
  290. t := db.newReadOnlyTransaction()
  291. defer t.close()
  292. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, prefix)), nil)
  293. defer dbi.Release()
  294. var fk []byte
  295. for dbi.Next() {
  296. name := db.globalKeyName(dbi.Key())
  297. if len(prefix) > 0 && !bytes.HasPrefix(name, prefix) {
  298. return
  299. }
  300. vl, ok := unmarshalVersionList(dbi.Value())
  301. if !ok {
  302. continue
  303. }
  304. fk = db.deviceKeyInto(fk, folder, vl.Versions[0].Device, name)
  305. f, ok := db.getFileTrunc(fk, truncate)
  306. if !ok {
  307. continue
  308. }
  309. if !fn(f) {
  310. return
  311. }
  312. }
  313. }
  314. func (db *Instance) availability(folder, file []byte) []protocol.DeviceID {
  315. k := db.globalKey(folder, file)
  316. bs, err := db.Get(k, nil)
  317. if err == leveldb.ErrNotFound {
  318. return nil
  319. }
  320. if err != nil {
  321. l.Debugln("surprise error:", err)
  322. return nil
  323. }
  324. vl, ok := unmarshalVersionList(bs)
  325. if !ok {
  326. return nil
  327. }
  328. var devices []protocol.DeviceID
  329. for _, v := range vl.Versions {
  330. if !v.Version.Equal(vl.Versions[0].Version) {
  331. break
  332. }
  333. if v.Invalid {
  334. continue
  335. }
  336. n := protocol.DeviceIDFromBytes(v.Device)
  337. devices = append(devices, n)
  338. }
  339. return devices
  340. }
  341. func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
  342. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  343. db.withNeedLocal(folder, truncate, fn)
  344. return
  345. }
  346. t := db.newReadOnlyTransaction()
  347. defer t.close()
  348. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  349. defer dbi.Release()
  350. var fk []byte
  351. for dbi.Next() {
  352. vl, ok := unmarshalVersionList(dbi.Value())
  353. if !ok {
  354. continue
  355. }
  356. haveFV, have := vl.Get(device)
  357. // XXX: This marks Concurrent (i.e. conflicting) changes as
  358. // needs. Maybe we should do that, but it needs special
  359. // handling in the puller.
  360. if have && haveFV.Version.GreaterEqual(vl.Versions[0].Version) {
  361. continue
  362. }
  363. name := db.globalKeyName(dbi.Key())
  364. needVersion := vl.Versions[0].Version
  365. needDevice := protocol.DeviceIDFromBytes(vl.Versions[0].Device)
  366. for i := range vl.Versions {
  367. if !vl.Versions[i].Version.Equal(needVersion) {
  368. // We haven't found a valid copy of the file with the needed version.
  369. break
  370. }
  371. if vl.Versions[i].Invalid {
  372. // The file is marked invalid, don't use it.
  373. continue
  374. }
  375. fk = db.deviceKeyInto(fk, folder, vl.Versions[i].Device, name)
  376. bs, err := t.Get(fk, nil)
  377. if err != nil {
  378. l.Debugln("surprise error:", err)
  379. continue
  380. }
  381. gf, err := unmarshalTrunc(bs, truncate)
  382. if err != nil {
  383. l.Debugln("unmarshal error:", err)
  384. continue
  385. }
  386. if gf.IsDeleted() && !have {
  387. // We don't need deleted files that we don't have
  388. break
  389. }
  390. l.Debugf("need folder=%q device=%v name=%q have=%v invalid=%v haveV=%v globalV=%v globalDev=%v", folder, protocol.DeviceIDFromBytes(device), name, have, haveFV.Invalid, haveFV.Version, needVersion, needDevice)
  391. if !fn(gf) {
  392. return
  393. }
  394. // This file is handled, no need to look further in the version list
  395. break
  396. }
  397. }
  398. }
  399. func (db *Instance) withNeedLocal(folder []byte, truncate bool, fn Iterator) {
  400. t := db.newReadOnlyTransaction()
  401. defer t.close()
  402. dbi := t.NewIterator(util.BytesPrefix(db.needKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  403. defer dbi.Release()
  404. var dk []byte
  405. var gk []byte
  406. var f FileIntf
  407. var ok bool
  408. for dbi.Next() {
  409. gk, dk, f, ok = db.getGlobalInto(t, gk, dk, folder, db.globalKeyName(dbi.Key()), truncate)
  410. if !ok {
  411. continue
  412. }
  413. if !fn(f) {
  414. return
  415. }
  416. }
  417. }
  418. func (db *Instance) ListFolders() []string {
  419. t := db.newReadOnlyTransaction()
  420. defer t.close()
  421. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
  422. defer dbi.Release()
  423. folderExists := make(map[string]bool)
  424. for dbi.Next() {
  425. folder, ok := db.globalKeyFolder(dbi.Key())
  426. if ok && !folderExists[string(folder)] {
  427. folderExists[string(folder)] = true
  428. }
  429. }
  430. folders := make([]string, 0, len(folderExists))
  431. for k := range folderExists {
  432. folders = append(folders, k)
  433. }
  434. sort.Strings(folders)
  435. return folders
  436. }
  437. func (db *Instance) dropFolder(folder []byte) {
  438. t := db.newReadWriteTransaction()
  439. defer t.close()
  440. for _, key := range [][]byte{
  441. // Remove all items related to the given folder from the device->file bucket
  442. db.deviceKey(folder, nil, nil)[:keyPrefixLen+keyFolderLen],
  443. // Remove all sequences related to the folder
  444. db.sequenceKey([]byte(folder), 0)[:keyPrefixLen+keyFolderLen],
  445. // Remove all items related to the given folder from the global bucket
  446. db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen],
  447. // Remove all needs related to the folder
  448. db.needKey(folder, nil)[:keyPrefixLen+keyFolderLen],
  449. } {
  450. t.deleteKeyPrefix(key)
  451. }
  452. }
  453. func (db *Instance) dropDeviceFolder(device, folder []byte, meta *metadataTracker) {
  454. t := db.newReadWriteTransaction()
  455. defer t.close()
  456. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, nil)), nil)
  457. defer dbi.Release()
  458. var gk []byte
  459. for dbi.Next() {
  460. key := dbi.Key()
  461. name := db.deviceKeyName(key)
  462. gk = db.globalKeyInto(gk, folder, name)
  463. t.removeFromGlobal(gk, folder, device, name, meta)
  464. t.Delete(key)
  465. t.checkFlush()
  466. }
  467. }
  468. func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) {
  469. t := db.newReadWriteTransaction()
  470. defer t.close()
  471. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  472. defer dbi.Release()
  473. var fk []byte
  474. for dbi.Next() {
  475. vl, ok := unmarshalVersionList(dbi.Value())
  476. if !ok {
  477. continue
  478. }
  479. // Check the global version list for consistency. An issue in previous
  480. // versions of goleveldb could result in reordered writes so that
  481. // there are global entries pointing to no longer existing files. Here
  482. // we find those and clear them out.
  483. name := db.globalKeyName(dbi.Key())
  484. var newVL VersionList
  485. for i, version := range vl.Versions {
  486. fk = db.deviceKeyInto(fk, folder, version.Device, name)
  487. _, err := t.Get(fk, nil)
  488. if err == leveldb.ErrNotFound {
  489. continue
  490. }
  491. if err != nil {
  492. l.Debugln("surprise error:", err)
  493. return
  494. }
  495. newVL.Versions = append(newVL.Versions, version)
  496. if i == 0 {
  497. if fi, ok := db.getFile(fk); ok {
  498. meta.addFile(protocol.GlobalDeviceID, fi)
  499. }
  500. }
  501. }
  502. if len(newVL.Versions) != len(vl.Versions) {
  503. t.Put(dbi.Key(), mustMarshal(&newVL))
  504. t.checkFlush()
  505. }
  506. }
  507. l.Debugf("db check completed for %q", folder)
  508. }
  509. // deviceKey returns a byte slice encoding the following information:
  510. // keyTypeDevice (1 byte)
  511. // folder (4 bytes)
  512. // device (4 bytes)
  513. // name (variable size)
  514. func (db *Instance) deviceKey(folder, device, file []byte) []byte {
  515. return db.deviceKeyInto(nil, folder, device, file)
  516. }
  517. func (db *Instance) deviceKeyInto(k, folder, device, file []byte) []byte {
  518. reqLen := keyPrefixLen + keyFolderLen + keyDeviceLen + len(file)
  519. k = resize(k, reqLen)
  520. k[0] = KeyTypeDevice
  521. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
  522. binary.BigEndian.PutUint32(k[keyPrefixLen+keyFolderLen:], db.deviceIdx.ID(device))
  523. copy(k[keyPrefixLen+keyFolderLen+keyDeviceLen:], file)
  524. return k
  525. }
  526. // deviceKeyName returns the device ID from the key
  527. func (db *Instance) deviceKeyName(key []byte) []byte {
  528. return key[keyPrefixLen+keyFolderLen+keyDeviceLen:]
  529. }
  530. // deviceKeyFolder returns the folder name from the key
  531. func (db *Instance) deviceKeyFolder(key []byte) []byte {
  532. folder, ok := db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  533. if !ok {
  534. panic("bug: lookup of nonexistent folder ID")
  535. }
  536. return folder
  537. }
  538. // deviceKeyDevice returns the device ID from the key
  539. func (db *Instance) deviceKeyDevice(key []byte) []byte {
  540. device, ok := db.deviceIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen+keyFolderLen:]))
  541. if !ok {
  542. panic("bug: lookup of nonexistent device ID")
  543. }
  544. return device
  545. }
  546. // globalKey returns a byte slice encoding the following information:
  547. // keyTypeGlobal (1 byte)
  548. // folder (4 bytes)
  549. // name (variable size)
  550. func (db *Instance) globalKey(folder, file []byte) []byte {
  551. return db.globalKeyInto(nil, folder, file)
  552. }
  553. func (db *Instance) globalKeyInto(gk, folder, file []byte) []byte {
  554. reqLen := keyPrefixLen + keyFolderLen + len(file)
  555. gk = resize(gk, reqLen)
  556. gk[0] = KeyTypeGlobal
  557. binary.BigEndian.PutUint32(gk[keyPrefixLen:], db.folderIdx.ID(folder))
  558. copy(gk[keyPrefixLen+keyFolderLen:], file)
  559. return gk
  560. }
  561. // globalKeyName returns the filename from the key
  562. func (db *Instance) globalKeyName(key []byte) []byte {
  563. return key[keyPrefixLen+keyFolderLen:]
  564. }
  565. // globalKeyFolder returns the folder name from the key
  566. func (db *Instance) globalKeyFolder(key []byte) ([]byte, bool) {
  567. return db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  568. }
  569. // needKey is a globalKey with a different prefix
  570. func (db *Instance) needKey(folder, file []byte) []byte {
  571. return db.needKeyInto(nil, folder, file)
  572. }
  573. func (db *Instance) needKeyInto(k, folder, file []byte) []byte {
  574. k = db.globalKeyInto(k, folder, file)
  575. k[0] = KeyTypeNeed
  576. return k
  577. }
  578. // sequenceKey returns a byte slice encoding the following information:
  579. // KeyTypeSequence (1 byte)
  580. // folder (4 bytes)
  581. // sequence number (8 bytes)
  582. func (db *Instance) sequenceKey(folder []byte, seq int64) []byte {
  583. return db.sequenceKeyInto(nil, folder, seq)
  584. }
  585. func (db *Instance) sequenceKeyInto(k []byte, folder []byte, seq int64) []byte {
  586. reqLen := keyPrefixLen + keyFolderLen + keySequenceLen
  587. k = resize(k, reqLen)
  588. k[0] = KeyTypeSequence
  589. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
  590. binary.BigEndian.PutUint64(k[keyPrefixLen+keyFolderLen:], uint64(seq))
  591. return k
  592. }
  593. // sequenceKeySequence returns the sequence number from the key
  594. func (db *Instance) sequenceKeySequence(key []byte) int64 {
  595. return int64(binary.BigEndian.Uint64(key[keyPrefixLen+keyFolderLen:]))
  596. }
  597. func (db *Instance) getIndexID(device, folder []byte) protocol.IndexID {
  598. key := db.indexIDKey(device, folder)
  599. cur, err := db.Get(key, nil)
  600. if err != nil {
  601. return 0
  602. }
  603. var id protocol.IndexID
  604. if err := id.Unmarshal(cur); err != nil {
  605. return 0
  606. }
  607. return id
  608. }
  609. func (db *Instance) setIndexID(device, folder []byte, id protocol.IndexID) {
  610. key := db.indexIDKey(device, folder)
  611. bs, _ := id.Marshal() // marshalling can't fail
  612. if err := db.Put(key, bs, nil); err != nil {
  613. panic("storing index ID: " + err.Error())
  614. }
  615. }
  616. func (db *Instance) indexIDKey(device, folder []byte) []byte {
  617. k := make([]byte, keyPrefixLen+keyDeviceLen+keyFolderLen)
  618. k[0] = KeyTypeIndexID
  619. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.deviceIdx.ID(device))
  620. binary.BigEndian.PutUint32(k[keyPrefixLen+keyDeviceLen:], db.folderIdx.ID(folder))
  621. return k
  622. }
  623. func (db *Instance) indexIDDevice(key []byte) []byte {
  624. device, ok := db.deviceIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  625. if !ok {
  626. // uuh ...
  627. return nil
  628. }
  629. return device
  630. }
  631. func (db *Instance) mtimesKey(folder []byte) []byte {
  632. prefix := make([]byte, 5) // key type + 4 bytes folder idx number
  633. prefix[0] = KeyTypeVirtualMtime
  634. binary.BigEndian.PutUint32(prefix[1:], db.folderIdx.ID(folder))
  635. return prefix
  636. }
  637. func (db *Instance) folderMetaKey(folder []byte) []byte {
  638. prefix := make([]byte, 5) // key type + 4 bytes folder idx number
  639. prefix[0] = KeyTypeFolderMeta
  640. binary.BigEndian.PutUint32(prefix[1:], db.folderIdx.ID(folder))
  641. return prefix
  642. }
  643. // DropLocalDeltaIndexIDs removes all index IDs for the local device ID from
  644. // the database. This will cause a full index transmission on the next
  645. // connection.
  646. func (db *Instance) DropLocalDeltaIndexIDs() {
  647. db.dropDeltaIndexIDs(true)
  648. }
  649. // DropRemoteDeltaIndexIDs removes all index IDs for the other devices than
  650. // the local one from the database. This will cause them to send us a full
  651. // index on the next connection.
  652. func (db *Instance) DropRemoteDeltaIndexIDs() {
  653. db.dropDeltaIndexIDs(false)
  654. }
  655. func (db *Instance) dropDeltaIndexIDs(local bool) {
  656. t := db.newReadWriteTransaction()
  657. defer t.close()
  658. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeIndexID}), nil)
  659. defer dbi.Release()
  660. for dbi.Next() {
  661. device := db.indexIDDevice(dbi.Key())
  662. if bytes.Equal(device, protocol.LocalDeviceID[:]) == local {
  663. t.Delete(dbi.Key())
  664. }
  665. }
  666. }
  667. func (db *Instance) dropMtimes(folder []byte) {
  668. db.dropPrefix(db.mtimesKey(folder))
  669. }
  670. func (db *Instance) dropFolderMeta(folder []byte) {
  671. db.dropPrefix(db.folderMetaKey(folder))
  672. }
  673. func (db *Instance) dropPrefix(prefix []byte) {
  674. t := db.newReadWriteTransaction()
  675. defer t.close()
  676. dbi := t.NewIterator(util.BytesPrefix(prefix), nil)
  677. defer dbi.Release()
  678. for dbi.Next() {
  679. t.Delete(dbi.Key())
  680. }
  681. }
  682. func unmarshalTrunc(bs []byte, truncate bool) (FileIntf, error) {
  683. if truncate {
  684. var tf FileInfoTruncated
  685. err := tf.Unmarshal(bs)
  686. return tf, err
  687. }
  688. var tf protocol.FileInfo
  689. err := tf.Unmarshal(bs)
  690. return tf, err
  691. }
  692. func unmarshalVersionList(data []byte) (VersionList, bool) {
  693. var vl VersionList
  694. if err := vl.Unmarshal(data); err != nil {
  695. l.Debugln("unmarshal error:", err)
  696. return VersionList{}, false
  697. }
  698. if len(vl.Versions) == 0 {
  699. l.Debugln("empty version list")
  700. return VersionList{}, false
  701. }
  702. return vl, true
  703. }
  704. // A "better" version of leveldb's errors.IsCorrupted.
  705. func leveldbIsCorrupted(err error) bool {
  706. switch {
  707. case err == nil:
  708. return false
  709. case errors.IsCorrupted(err):
  710. return true
  711. case strings.Contains(err.Error(), "corrupted"):
  712. return true
  713. }
  714. return false
  715. }
  716. // A smallIndex is an in memory bidirectional []byte to uint32 map. It gives
  717. // fast lookups in both directions and persists to the database. Don't use for
  718. // storing more items than fit comfortably in RAM.
  719. type smallIndex struct {
  720. db *Instance
  721. prefix []byte
  722. id2val map[uint32]string
  723. val2id map[string]uint32
  724. nextID uint32
  725. mut sync.Mutex
  726. }
  727. func newSmallIndex(db *Instance, prefix []byte) *smallIndex {
  728. idx := &smallIndex{
  729. db: db,
  730. prefix: prefix,
  731. id2val: make(map[uint32]string),
  732. val2id: make(map[string]uint32),
  733. mut: sync.NewMutex(),
  734. }
  735. idx.load()
  736. return idx
  737. }
  738. // load iterates over the prefix space in the database and populates the in
  739. // memory maps.
  740. func (i *smallIndex) load() {
  741. tr := i.db.newReadOnlyTransaction()
  742. it := tr.NewIterator(util.BytesPrefix(i.prefix), nil)
  743. for it.Next() {
  744. val := string(it.Value())
  745. id := binary.BigEndian.Uint32(it.Key()[len(i.prefix):])
  746. i.id2val[id] = val
  747. i.val2id[val] = id
  748. if id >= i.nextID {
  749. i.nextID = id + 1
  750. }
  751. }
  752. it.Release()
  753. tr.close()
  754. }
  755. // ID returns the index number for the given byte slice, allocating a new one
  756. // and persisting this to the database if necessary.
  757. func (i *smallIndex) ID(val []byte) uint32 {
  758. i.mut.Lock()
  759. // intentionally avoiding defer here as we want this call to be as fast as
  760. // possible in the general case (folder ID already exists). The map lookup
  761. // with the conversion of []byte to string is compiler optimized to not
  762. // copy the []byte, which is why we don't assign it to a temp variable
  763. // here.
  764. if id, ok := i.val2id[string(val)]; ok {
  765. i.mut.Unlock()
  766. return id
  767. }
  768. id := i.nextID
  769. i.nextID++
  770. valStr := string(val)
  771. i.val2id[valStr] = id
  772. i.id2val[id] = valStr
  773. key := make([]byte, len(i.prefix)+8) // prefix plus uint32 id
  774. copy(key, i.prefix)
  775. binary.BigEndian.PutUint32(key[len(i.prefix):], id)
  776. i.db.Put(key, val, nil)
  777. i.mut.Unlock()
  778. return id
  779. }
  780. // Val returns the value for the given index number, or (nil, false) if there
  781. // is no such index number.
  782. func (i *smallIndex) Val(id uint32) ([]byte, bool) {
  783. i.mut.Lock()
  784. val, ok := i.id2val[id]
  785. i.mut.Unlock()
  786. if !ok {
  787. return nil, false
  788. }
  789. return []byte(val), true
  790. }
  791. // resize returns a byte array of length reqLen, reusing k if possible
  792. func resize(k []byte, reqLen int) []byte {
  793. if cap(k) < reqLen {
  794. return make([]byte, reqLen)
  795. }
  796. return k[:reqLen]
  797. }
  798. type errorSuggestion struct {
  799. inner error
  800. suggestion string
  801. }
  802. func (e errorSuggestion) Error() string {
  803. return fmt.Sprintf("%s (%s)", e.inner.Error(), e.suggestion)
  804. }