leveldb_dbinstance.go 24 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "encoding/binary"
  10. "fmt"
  11. "os"
  12. "sort"
  13. "strings"
  14. "sync/atomic"
  15. "github.com/syncthing/syncthing/lib/protocol"
  16. "github.com/syncthing/syncthing/lib/sync"
  17. "github.com/syndtr/goleveldb/leveldb"
  18. "github.com/syndtr/goleveldb/leveldb/errors"
  19. "github.com/syndtr/goleveldb/leveldb/iterator"
  20. "github.com/syndtr/goleveldb/leveldb/opt"
  21. "github.com/syndtr/goleveldb/leveldb/storage"
  22. "github.com/syndtr/goleveldb/leveldb/util"
  23. )
  24. type deletionHandler func(t readWriteTransaction, folder, device, name []byte, dbi iterator.Iterator)
  25. type Instance struct {
  26. committed int64 // this must be the first attribute in the struct to ensure 64 bit alignment on 32 bit plaforms
  27. *leveldb.DB
  28. location string
  29. folderIdx *smallIndex
  30. deviceIdx *smallIndex
  31. }
  32. const (
  33. keyPrefixLen = 1
  34. keyFolderLen = 4 // indexed
  35. keyDeviceLen = 4 // indexed
  36. keySequenceLen = 8
  37. keyHashLen = 32
  38. maxInt64 int64 = 1<<63 - 1
  39. )
  40. func Open(file string) (*Instance, error) {
  41. opts := &opt.Options{
  42. OpenFilesCacheCapacity: 100,
  43. WriteBuffer: 4 << 20,
  44. }
  45. db, err := leveldb.OpenFile(file, opts)
  46. if leveldbIsCorrupted(err) {
  47. db, err = leveldb.RecoverFile(file, opts)
  48. }
  49. if leveldbIsCorrupted(err) {
  50. // The database is corrupted, and we've tried to recover it but it
  51. // didn't work. At this point there isn't much to do beyond dropping
  52. // the database and reindexing...
  53. l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
  54. if err := os.RemoveAll(file); err != nil {
  55. return nil, errorSuggestion{err, "failed to delete corrupted database"}
  56. }
  57. db, err = leveldb.OpenFile(file, opts)
  58. }
  59. if err != nil {
  60. return nil, errorSuggestion{err, "is another instance of Syncthing running?"}
  61. }
  62. return newDBInstance(db, file)
  63. }
  64. func OpenMemory() *Instance {
  65. db, _ := leveldb.Open(storage.NewMemStorage(), nil)
  66. ldb, _ := newDBInstance(db, "<memory>")
  67. return ldb
  68. }
  69. func newDBInstance(db *leveldb.DB, location string) (*Instance, error) {
  70. i := &Instance{
  71. DB: db,
  72. location: location,
  73. }
  74. i.folderIdx = newSmallIndex(i, []byte{KeyTypeFolderIdx})
  75. i.deviceIdx = newSmallIndex(i, []byte{KeyTypeDeviceIdx})
  76. err := i.updateSchema()
  77. return i, err
  78. }
  79. // Committed returns the number of items committed to the database since startup
  80. func (db *Instance) Committed() int64 {
  81. return atomic.LoadInt64(&db.committed)
  82. }
  83. // Location returns the filesystem path where the database is stored
  84. func (db *Instance) Location() string {
  85. return db.location
  86. }
  87. func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) {
  88. t := db.newReadWriteTransaction()
  89. defer t.close()
  90. var fk []byte
  91. var gk []byte
  92. for _, f := range fs {
  93. name := []byte(f.Name)
  94. fk = db.deviceKeyInto(fk, folder, device, name)
  95. // Get and unmarshal the file entry. If it doesn't exist or can't be
  96. // unmarshalled we'll add it as a new entry.
  97. bs, err := t.Get(fk, nil)
  98. var ef FileInfoTruncated
  99. if err == nil {
  100. err = ef.Unmarshal(bs)
  101. }
  102. // Local flags or the invalid bit might change without the version
  103. // being bumped. The IsInvalid() method handles both.
  104. if err == nil && ef.Version.Equal(f.Version) && ef.IsInvalid() == f.IsInvalid() {
  105. continue
  106. }
  107. devID := protocol.DeviceIDFromBytes(device)
  108. if err == nil {
  109. meta.removeFile(devID, ef)
  110. }
  111. meta.addFile(devID, f)
  112. t.insertFile(fk, folder, device, f)
  113. gk = db.globalKeyInto(gk, folder, name)
  114. t.updateGlobal(gk, folder, device, f, meta)
  115. // Write out and reuse the batch every few records, to avoid the batch
  116. // growing too large and thus allocating unnecessarily much memory.
  117. t.checkFlush()
  118. }
  119. }
  120. func (db *Instance) addSequences(folder []byte, fs []protocol.FileInfo) {
  121. t := db.newReadWriteTransaction()
  122. defer t.close()
  123. var sk []byte
  124. var dk []byte
  125. for _, f := range fs {
  126. sk = db.sequenceKeyInto(sk, folder, f.Sequence)
  127. dk = db.deviceKeyInto(dk, folder, protocol.LocalDeviceID[:], []byte(f.Name))
  128. t.Put(sk, dk)
  129. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  130. t.checkFlush()
  131. }
  132. }
  133. func (db *Instance) removeSequences(folder []byte, fs []protocol.FileInfo) {
  134. t := db.newReadWriteTransaction()
  135. defer t.close()
  136. var sk []byte
  137. for _, f := range fs {
  138. t.Delete(db.sequenceKeyInto(sk, folder, f.Sequence))
  139. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  140. t.checkFlush()
  141. }
  142. }
  143. func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
  144. if len(prefix) > 0 {
  145. unslashedPrefix := prefix
  146. if bytes.HasSuffix(prefix, []byte{'/'}) {
  147. unslashedPrefix = unslashedPrefix[:len(unslashedPrefix)-1]
  148. } else {
  149. prefix = append(prefix, '/')
  150. }
  151. if f, ok := db.getFileTrunc(db.deviceKey(folder, device, unslashedPrefix), true); ok && !fn(f) {
  152. return
  153. }
  154. }
  155. t := db.newReadOnlyTransaction()
  156. defer t.close()
  157. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, prefix)[:keyPrefixLen+keyFolderLen+keyDeviceLen+len(prefix)]), nil)
  158. defer dbi.Release()
  159. for dbi.Next() {
  160. name := db.deviceKeyName(dbi.Key())
  161. if len(prefix) > 0 && !bytes.HasPrefix(name, prefix) {
  162. return
  163. }
  164. // The iterator function may keep a reference to the unmarshalled
  165. // struct, which in turn references the buffer it was unmarshalled
  166. // from. dbi.Value() just returns an internal slice that it reuses, so
  167. // we need to copy it.
  168. f, err := unmarshalTrunc(append([]byte{}, dbi.Value()...), truncate)
  169. if err != nil {
  170. l.Debugln("unmarshal error:", err)
  171. continue
  172. }
  173. if !fn(f) {
  174. return
  175. }
  176. }
  177. }
  178. func (db *Instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator) {
  179. t := db.newReadOnlyTransaction()
  180. defer t.close()
  181. dbi := t.NewIterator(&util.Range{Start: db.sequenceKey(folder, startSeq), Limit: db.sequenceKey(folder, maxInt64)}, nil)
  182. defer dbi.Release()
  183. for dbi.Next() {
  184. f, ok := db.getFile(dbi.Value())
  185. if !ok {
  186. l.Debugln("missing file for sequence number", db.sequenceKeySequence(dbi.Key()))
  187. continue
  188. }
  189. if !fn(f) {
  190. return
  191. }
  192. }
  193. }
  194. func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
  195. t := db.newReadWriteTransaction()
  196. defer t.close()
  197. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, nil, nil)[:keyPrefixLen+keyFolderLen]), nil)
  198. defer dbi.Release()
  199. var gk []byte
  200. for dbi.Next() {
  201. device := db.deviceKeyDevice(dbi.Key())
  202. var f FileInfoTruncated
  203. // The iterator function may keep a reference to the unmarshalled
  204. // struct, which in turn references the buffer it was unmarshalled
  205. // from. dbi.Value() just returns an internal slice that it reuses, so
  206. // we need to copy it.
  207. err := f.Unmarshal(append([]byte{}, dbi.Value()...))
  208. if err != nil {
  209. l.Debugln("unmarshal error:", err)
  210. continue
  211. }
  212. switch f.Name {
  213. case "", ".", "..", "/": // A few obviously invalid filenames
  214. l.Infof("Dropping invalid filename %q from database", f.Name)
  215. name := []byte(f.Name)
  216. gk = db.globalKeyInto(gk, folder, name)
  217. t.removeFromGlobal(gk, folder, device, name, nil)
  218. t.Delete(dbi.Key())
  219. t.checkFlush()
  220. continue
  221. }
  222. if !fn(device, f) {
  223. return
  224. }
  225. }
  226. }
  227. func (db *Instance) getFile(key []byte) (protocol.FileInfo, bool) {
  228. if f, ok := db.getFileTrunc(key, false); ok {
  229. return f.(protocol.FileInfo), true
  230. }
  231. return protocol.FileInfo{}, false
  232. }
  233. func (db *Instance) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) {
  234. bs, err := db.Get(key, nil)
  235. if err == leveldb.ErrNotFound {
  236. return nil, false
  237. }
  238. if err != nil {
  239. l.Debugln("surprise error:", err)
  240. return nil, false
  241. }
  242. f, err := unmarshalTrunc(bs, trunc)
  243. if err != nil {
  244. l.Debugln("unmarshal error:", err)
  245. return nil, false
  246. }
  247. return f, true
  248. }
  249. func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
  250. t := db.newReadOnlyTransaction()
  251. defer t.close()
  252. _, _, f, ok := db.getGlobalInto(t, nil, nil, folder, file, truncate)
  253. return f, ok
  254. }
  255. func (db *Instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file []byte, truncate bool) ([]byte, []byte, FileIntf, bool) {
  256. gk = db.globalKeyInto(gk, folder, file)
  257. bs, err := t.Get(gk, nil)
  258. if err != nil {
  259. return gk, dk, nil, false
  260. }
  261. vl, ok := unmarshalVersionList(bs)
  262. if !ok {
  263. return gk, dk, nil, false
  264. }
  265. dk = db.deviceKeyInto(dk, folder, vl.Versions[0].Device, file)
  266. if fi, ok := db.getFileTrunc(dk, truncate); ok {
  267. return gk, dk, fi, true
  268. }
  269. return gk, dk, nil, false
  270. }
  271. func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
  272. if len(prefix) > 0 {
  273. unslashedPrefix := prefix
  274. if bytes.HasSuffix(prefix, []byte{'/'}) {
  275. unslashedPrefix = unslashedPrefix[:len(unslashedPrefix)-1]
  276. } else {
  277. prefix = append(prefix, '/')
  278. }
  279. if f, ok := db.getGlobal(folder, unslashedPrefix, truncate); ok && !fn(f) {
  280. return
  281. }
  282. }
  283. t := db.newReadOnlyTransaction()
  284. defer t.close()
  285. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, prefix)), nil)
  286. defer dbi.Release()
  287. var fk []byte
  288. for dbi.Next() {
  289. name := db.globalKeyName(dbi.Key())
  290. if len(prefix) > 0 && !bytes.HasPrefix(name, prefix) {
  291. return
  292. }
  293. vl, ok := unmarshalVersionList(dbi.Value())
  294. if !ok {
  295. continue
  296. }
  297. fk = db.deviceKeyInto(fk, folder, vl.Versions[0].Device, name)
  298. f, ok := db.getFileTrunc(fk, truncate)
  299. if !ok {
  300. continue
  301. }
  302. if !fn(f) {
  303. return
  304. }
  305. }
  306. }
  307. func (db *Instance) availability(folder, file []byte) []protocol.DeviceID {
  308. k := db.globalKey(folder, file)
  309. bs, err := db.Get(k, nil)
  310. if err == leveldb.ErrNotFound {
  311. return nil
  312. }
  313. if err != nil {
  314. l.Debugln("surprise error:", err)
  315. return nil
  316. }
  317. vl, ok := unmarshalVersionList(bs)
  318. if !ok {
  319. return nil
  320. }
  321. var devices []protocol.DeviceID
  322. for _, v := range vl.Versions {
  323. if !v.Version.Equal(vl.Versions[0].Version) {
  324. break
  325. }
  326. if v.Invalid {
  327. continue
  328. }
  329. n := protocol.DeviceIDFromBytes(v.Device)
  330. devices = append(devices, n)
  331. }
  332. return devices
  333. }
  334. func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
  335. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  336. db.withNeedLocal(folder, truncate, fn)
  337. return
  338. }
  339. t := db.newReadOnlyTransaction()
  340. defer t.close()
  341. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  342. defer dbi.Release()
  343. var fk []byte
  344. for dbi.Next() {
  345. vl, ok := unmarshalVersionList(dbi.Value())
  346. if !ok {
  347. continue
  348. }
  349. haveFV, have := vl.Get(device)
  350. // XXX: This marks Concurrent (i.e. conflicting) changes as
  351. // needs. Maybe we should do that, but it needs special
  352. // handling in the puller.
  353. if have && haveFV.Version.GreaterEqual(vl.Versions[0].Version) {
  354. continue
  355. }
  356. name := db.globalKeyName(dbi.Key())
  357. needVersion := vl.Versions[0].Version
  358. needDevice := protocol.DeviceIDFromBytes(vl.Versions[0].Device)
  359. for i := range vl.Versions {
  360. if !vl.Versions[i].Version.Equal(needVersion) {
  361. // We haven't found a valid copy of the file with the needed version.
  362. break
  363. }
  364. if vl.Versions[i].Invalid {
  365. // The file is marked invalid, don't use it.
  366. continue
  367. }
  368. fk = db.deviceKeyInto(fk, folder, vl.Versions[i].Device, name)
  369. bs, err := t.Get(fk, nil)
  370. if err != nil {
  371. l.Debugln("surprise error:", err)
  372. continue
  373. }
  374. gf, err := unmarshalTrunc(bs, truncate)
  375. if err != nil {
  376. l.Debugln("unmarshal error:", err)
  377. continue
  378. }
  379. if gf.IsDeleted() && !have {
  380. // We don't need deleted files that we don't have
  381. break
  382. }
  383. l.Debugf("need folder=%q device=%v name=%q have=%v invalid=%v haveV=%v globalV=%v globalDev=%v", folder, protocol.DeviceIDFromBytes(device), name, have, haveFV.Invalid, haveFV.Version, needVersion, needDevice)
  384. if !fn(gf) {
  385. return
  386. }
  387. // This file is handled, no need to look further in the version list
  388. break
  389. }
  390. }
  391. }
  392. func (db *Instance) withNeedLocal(folder []byte, truncate bool, fn Iterator) {
  393. t := db.newReadOnlyTransaction()
  394. defer t.close()
  395. dbi := t.NewIterator(util.BytesPrefix(db.needKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  396. defer dbi.Release()
  397. var dk []byte
  398. var gk []byte
  399. var f FileIntf
  400. var ok bool
  401. for dbi.Next() {
  402. gk, dk, f, ok = db.getGlobalInto(t, gk, dk, folder, db.globalKeyName(dbi.Key()), truncate)
  403. if !ok {
  404. continue
  405. }
  406. if !fn(f) {
  407. return
  408. }
  409. }
  410. }
  411. func (db *Instance) ListFolders() []string {
  412. t := db.newReadOnlyTransaction()
  413. defer t.close()
  414. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
  415. defer dbi.Release()
  416. folderExists := make(map[string]bool)
  417. for dbi.Next() {
  418. folder, ok := db.globalKeyFolder(dbi.Key())
  419. if ok && !folderExists[string(folder)] {
  420. folderExists[string(folder)] = true
  421. }
  422. }
  423. folders := make([]string, 0, len(folderExists))
  424. for k := range folderExists {
  425. folders = append(folders, k)
  426. }
  427. sort.Strings(folders)
  428. return folders
  429. }
  430. func (db *Instance) dropFolder(folder []byte) {
  431. t := db.newReadWriteTransaction()
  432. defer t.close()
  433. for _, key := range [][]byte{
  434. // Remove all items related to the given folder from the device->file bucket
  435. db.deviceKey(folder, nil, nil)[:keyPrefixLen+keyFolderLen],
  436. // Remove all sequences related to the folder
  437. db.sequenceKey([]byte(folder), 0)[:keyPrefixLen+keyFolderLen],
  438. // Remove all items related to the given folder from the global bucket
  439. db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen],
  440. // Remove all needs related to the folder
  441. db.needKey(folder, nil)[:keyPrefixLen+keyFolderLen],
  442. } {
  443. t.deleteKeyPrefix(key)
  444. }
  445. }
  446. func (db *Instance) dropDeviceFolder(device, folder []byte, meta *metadataTracker) {
  447. t := db.newReadWriteTransaction()
  448. defer t.close()
  449. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, nil)), nil)
  450. defer dbi.Release()
  451. var gk []byte
  452. for dbi.Next() {
  453. key := dbi.Key()
  454. name := db.deviceKeyName(key)
  455. gk = db.globalKeyInto(gk, folder, name)
  456. t.removeFromGlobal(gk, folder, device, name, meta)
  457. t.Delete(key)
  458. t.checkFlush()
  459. }
  460. }
  461. func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) {
  462. t := db.newReadWriteTransaction()
  463. defer t.close()
  464. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  465. defer dbi.Release()
  466. var fk []byte
  467. for dbi.Next() {
  468. vl, ok := unmarshalVersionList(dbi.Value())
  469. if !ok {
  470. continue
  471. }
  472. // Check the global version list for consistency. An issue in previous
  473. // versions of goleveldb could result in reordered writes so that
  474. // there are global entries pointing to no longer existing files. Here
  475. // we find those and clear them out.
  476. name := db.globalKeyName(dbi.Key())
  477. var newVL VersionList
  478. for i, version := range vl.Versions {
  479. fk = db.deviceKeyInto(fk, folder, version.Device, name)
  480. _, err := t.Get(fk, nil)
  481. if err == leveldb.ErrNotFound {
  482. continue
  483. }
  484. if err != nil {
  485. l.Debugln("surprise error:", err)
  486. return
  487. }
  488. newVL.Versions = append(newVL.Versions, version)
  489. if i == 0 {
  490. if fi, ok := db.getFile(fk); ok {
  491. meta.addFile(globalDeviceID, fi)
  492. }
  493. }
  494. }
  495. if len(newVL.Versions) != len(vl.Versions) {
  496. t.Put(dbi.Key(), mustMarshal(&newVL))
  497. t.checkFlush()
  498. }
  499. }
  500. l.Debugf("db check completed for %q", folder)
  501. }
  502. // deviceKey returns a byte slice encoding the following information:
  503. // keyTypeDevice (1 byte)
  504. // folder (4 bytes)
  505. // device (4 bytes)
  506. // name (variable size)
  507. func (db *Instance) deviceKey(folder, device, file []byte) []byte {
  508. return db.deviceKeyInto(nil, folder, device, file)
  509. }
  510. func (db *Instance) deviceKeyInto(k, folder, device, file []byte) []byte {
  511. reqLen := keyPrefixLen + keyFolderLen + keyDeviceLen + len(file)
  512. k = resize(k, reqLen)
  513. k[0] = KeyTypeDevice
  514. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
  515. binary.BigEndian.PutUint32(k[keyPrefixLen+keyFolderLen:], db.deviceIdx.ID(device))
  516. copy(k[keyPrefixLen+keyFolderLen+keyDeviceLen:], file)
  517. return k
  518. }
  519. // deviceKeyName returns the device ID from the key
  520. func (db *Instance) deviceKeyName(key []byte) []byte {
  521. return key[keyPrefixLen+keyFolderLen+keyDeviceLen:]
  522. }
  523. // deviceKeyFolder returns the folder name from the key
  524. func (db *Instance) deviceKeyFolder(key []byte) []byte {
  525. folder, ok := db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  526. if !ok {
  527. panic("bug: lookup of nonexistent folder ID")
  528. }
  529. return folder
  530. }
  531. // deviceKeyDevice returns the device ID from the key
  532. func (db *Instance) deviceKeyDevice(key []byte) []byte {
  533. device, ok := db.deviceIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen+keyFolderLen:]))
  534. if !ok {
  535. panic("bug: lookup of nonexistent device ID")
  536. }
  537. return device
  538. }
  539. // globalKey returns a byte slice encoding the following information:
  540. // keyTypeGlobal (1 byte)
  541. // folder (4 bytes)
  542. // name (variable size)
  543. func (db *Instance) globalKey(folder, file []byte) []byte {
  544. return db.globalKeyInto(nil, folder, file)
  545. }
  546. func (db *Instance) globalKeyInto(gk, folder, file []byte) []byte {
  547. reqLen := keyPrefixLen + keyFolderLen + len(file)
  548. gk = resize(gk, reqLen)
  549. gk[0] = KeyTypeGlobal
  550. binary.BigEndian.PutUint32(gk[keyPrefixLen:], db.folderIdx.ID(folder))
  551. copy(gk[keyPrefixLen+keyFolderLen:], file)
  552. return gk
  553. }
  554. // globalKeyName returns the filename from the key
  555. func (db *Instance) globalKeyName(key []byte) []byte {
  556. return key[keyPrefixLen+keyFolderLen:]
  557. }
  558. // globalKeyFolder returns the folder name from the key
  559. func (db *Instance) globalKeyFolder(key []byte) ([]byte, bool) {
  560. return db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  561. }
  562. // needKey is a globalKey with a different prefix
  563. func (db *Instance) needKey(folder, file []byte) []byte {
  564. return db.needKeyInto(nil, folder, file)
  565. }
  566. func (db *Instance) needKeyInto(k, folder, file []byte) []byte {
  567. k = db.globalKeyInto(k, folder, file)
  568. k[0] = KeyTypeNeed
  569. return k
  570. }
  571. // sequenceKey returns a byte slice encoding the following information:
  572. // KeyTypeSequence (1 byte)
  573. // folder (4 bytes)
  574. // sequence number (8 bytes)
  575. func (db *Instance) sequenceKey(folder []byte, seq int64) []byte {
  576. return db.sequenceKeyInto(nil, folder, seq)
  577. }
  578. func (db *Instance) sequenceKeyInto(k []byte, folder []byte, seq int64) []byte {
  579. reqLen := keyPrefixLen + keyFolderLen + keySequenceLen
  580. k = resize(k, reqLen)
  581. k[0] = KeyTypeSequence
  582. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
  583. binary.BigEndian.PutUint64(k[keyPrefixLen+keyFolderLen:], uint64(seq))
  584. return k
  585. }
  586. // sequenceKeySequence returns the sequence number from the key
  587. func (db *Instance) sequenceKeySequence(key []byte) int64 {
  588. return int64(binary.BigEndian.Uint64(key[keyPrefixLen+keyFolderLen:]))
  589. }
  590. func (db *Instance) getIndexID(device, folder []byte) protocol.IndexID {
  591. key := db.indexIDKey(device, folder)
  592. cur, err := db.Get(key, nil)
  593. if err != nil {
  594. return 0
  595. }
  596. var id protocol.IndexID
  597. if err := id.Unmarshal(cur); err != nil {
  598. return 0
  599. }
  600. return id
  601. }
  602. func (db *Instance) setIndexID(device, folder []byte, id protocol.IndexID) {
  603. key := db.indexIDKey(device, folder)
  604. bs, _ := id.Marshal() // marshalling can't fail
  605. if err := db.Put(key, bs, nil); err != nil {
  606. panic("storing index ID: " + err.Error())
  607. }
  608. }
  609. func (db *Instance) indexIDKey(device, folder []byte) []byte {
  610. k := make([]byte, keyPrefixLen+keyDeviceLen+keyFolderLen)
  611. k[0] = KeyTypeIndexID
  612. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.deviceIdx.ID(device))
  613. binary.BigEndian.PutUint32(k[keyPrefixLen+keyDeviceLen:], db.folderIdx.ID(folder))
  614. return k
  615. }
  616. func (db *Instance) indexIDDevice(key []byte) []byte {
  617. device, ok := db.deviceIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  618. if !ok {
  619. // uuh ...
  620. return nil
  621. }
  622. return device
  623. }
  624. func (db *Instance) mtimesKey(folder []byte) []byte {
  625. prefix := make([]byte, 5) // key type + 4 bytes folder idx number
  626. prefix[0] = KeyTypeVirtualMtime
  627. binary.BigEndian.PutUint32(prefix[1:], db.folderIdx.ID(folder))
  628. return prefix
  629. }
  630. func (db *Instance) folderMetaKey(folder []byte) []byte {
  631. prefix := make([]byte, 5) // key type + 4 bytes folder idx number
  632. prefix[0] = KeyTypeFolderMeta
  633. binary.BigEndian.PutUint32(prefix[1:], db.folderIdx.ID(folder))
  634. return prefix
  635. }
  636. // DropLocalDeltaIndexIDs removes all index IDs for the local device ID from
  637. // the database. This will cause a full index transmission on the next
  638. // connection.
  639. func (db *Instance) DropLocalDeltaIndexIDs() {
  640. db.dropDeltaIndexIDs(true)
  641. }
  642. // DropRemoteDeltaIndexIDs removes all index IDs for the other devices than
  643. // the local one from the database. This will cause them to send us a full
  644. // index on the next connection.
  645. func (db *Instance) DropRemoteDeltaIndexIDs() {
  646. db.dropDeltaIndexIDs(false)
  647. }
  648. func (db *Instance) dropDeltaIndexIDs(local bool) {
  649. t := db.newReadWriteTransaction()
  650. defer t.close()
  651. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeIndexID}), nil)
  652. defer dbi.Release()
  653. for dbi.Next() {
  654. device := db.indexIDDevice(dbi.Key())
  655. if bytes.Equal(device, protocol.LocalDeviceID[:]) == local {
  656. t.Delete(dbi.Key())
  657. }
  658. }
  659. }
  660. func (db *Instance) dropMtimes(folder []byte) {
  661. db.dropPrefix(db.mtimesKey(folder))
  662. }
  663. func (db *Instance) dropFolderMeta(folder []byte) {
  664. db.dropPrefix(db.folderMetaKey(folder))
  665. }
  666. func (db *Instance) dropPrefix(prefix []byte) {
  667. t := db.newReadWriteTransaction()
  668. defer t.close()
  669. dbi := t.NewIterator(util.BytesPrefix(prefix), nil)
  670. defer dbi.Release()
  671. for dbi.Next() {
  672. t.Delete(dbi.Key())
  673. }
  674. }
  675. func unmarshalTrunc(bs []byte, truncate bool) (FileIntf, error) {
  676. if truncate {
  677. var tf FileInfoTruncated
  678. err := tf.Unmarshal(bs)
  679. return tf, err
  680. }
  681. var tf protocol.FileInfo
  682. err := tf.Unmarshal(bs)
  683. return tf, err
  684. }
  685. func unmarshalVersionList(data []byte) (VersionList, bool) {
  686. var vl VersionList
  687. if err := vl.Unmarshal(data); err != nil {
  688. l.Debugln("unmarshal error:", err)
  689. return VersionList{}, false
  690. }
  691. if len(vl.Versions) == 0 {
  692. l.Debugln("empty version list")
  693. return VersionList{}, false
  694. }
  695. return vl, true
  696. }
  697. // A "better" version of leveldb's errors.IsCorrupted.
  698. func leveldbIsCorrupted(err error) bool {
  699. switch {
  700. case err == nil:
  701. return false
  702. case errors.IsCorrupted(err):
  703. return true
  704. case strings.Contains(err.Error(), "corrupted"):
  705. return true
  706. }
  707. return false
  708. }
  709. // A smallIndex is an in memory bidirectional []byte to uint32 map. It gives
  710. // fast lookups in both directions and persists to the database. Don't use for
  711. // storing more items than fit comfortably in RAM.
  712. type smallIndex struct {
  713. db *Instance
  714. prefix []byte
  715. id2val map[uint32]string
  716. val2id map[string]uint32
  717. nextID uint32
  718. mut sync.Mutex
  719. }
  720. func newSmallIndex(db *Instance, prefix []byte) *smallIndex {
  721. idx := &smallIndex{
  722. db: db,
  723. prefix: prefix,
  724. id2val: make(map[uint32]string),
  725. val2id: make(map[string]uint32),
  726. mut: sync.NewMutex(),
  727. }
  728. idx.load()
  729. return idx
  730. }
  731. // load iterates over the prefix space in the database and populates the in
  732. // memory maps.
  733. func (i *smallIndex) load() {
  734. tr := i.db.newReadOnlyTransaction()
  735. it := tr.NewIterator(util.BytesPrefix(i.prefix), nil)
  736. for it.Next() {
  737. val := string(it.Value())
  738. id := binary.BigEndian.Uint32(it.Key()[len(i.prefix):])
  739. i.id2val[id] = val
  740. i.val2id[val] = id
  741. if id >= i.nextID {
  742. i.nextID = id + 1
  743. }
  744. }
  745. it.Release()
  746. tr.close()
  747. }
  748. // ID returns the index number for the given byte slice, allocating a new one
  749. // and persisting this to the database if necessary.
  750. func (i *smallIndex) ID(val []byte) uint32 {
  751. i.mut.Lock()
  752. // intentionally avoiding defer here as we want this call to be as fast as
  753. // possible in the general case (folder ID already exists). The map lookup
  754. // with the conversion of []byte to string is compiler optimized to not
  755. // copy the []byte, which is why we don't assign it to a temp variable
  756. // here.
  757. if id, ok := i.val2id[string(val)]; ok {
  758. i.mut.Unlock()
  759. return id
  760. }
  761. id := i.nextID
  762. i.nextID++
  763. valStr := string(val)
  764. i.val2id[valStr] = id
  765. i.id2val[id] = valStr
  766. key := make([]byte, len(i.prefix)+8) // prefix plus uint32 id
  767. copy(key, i.prefix)
  768. binary.BigEndian.PutUint32(key[len(i.prefix):], id)
  769. i.db.Put(key, val, nil)
  770. i.mut.Unlock()
  771. return id
  772. }
  773. // Val returns the value for the given index number, or (nil, false) if there
  774. // is no such index number.
  775. func (i *smallIndex) Val(id uint32) ([]byte, bool) {
  776. i.mut.Lock()
  777. val, ok := i.id2val[id]
  778. i.mut.Unlock()
  779. if !ok {
  780. return nil, false
  781. }
  782. return []byte(val), true
  783. }
  784. // resize returns a byte array of length reqLen, reusing k if possible
  785. func resize(k []byte, reqLen int) []byte {
  786. if cap(k) < reqLen {
  787. return make([]byte, reqLen)
  788. }
  789. return k[:reqLen]
  790. }
  791. type errorSuggestion struct {
  792. inner error
  793. suggestion string
  794. }
  795. func (e errorSuggestion) Error() string {
  796. return fmt.Sprintf("%s (%s)", e.inner.Error(), e.suggestion)
  797. }