leveldb_dbinstance.go 28 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "encoding/binary"
  10. "os"
  11. "sort"
  12. "strings"
  13. "sync/atomic"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. "github.com/syncthing/syncthing/lib/sync"
  16. "github.com/syndtr/goleveldb/leveldb"
  17. "github.com/syndtr/goleveldb/leveldb/errors"
  18. "github.com/syndtr/goleveldb/leveldb/iterator"
  19. "github.com/syndtr/goleveldb/leveldb/opt"
  20. "github.com/syndtr/goleveldb/leveldb/storage"
  21. "github.com/syndtr/goleveldb/leveldb/util"
  22. )
  23. type deletionHandler func(t readWriteTransaction, folder, device, name []byte, dbi iterator.Iterator)
  24. type Instance struct {
  25. committed int64 // this must be the first attribute in the struct to ensure 64 bit alignment on 32 bit plaforms
  26. *leveldb.DB
  27. location string
  28. folderIdx *smallIndex
  29. deviceIdx *smallIndex
  30. }
  31. const (
  32. keyPrefixLen = 1
  33. keyFolderLen = 4 // indexed
  34. keyDeviceLen = 4 // indexed
  35. keySequenceLen = 8
  36. keyHashLen = 32
  37. maxInt64 int64 = 1<<63 - 1
  38. )
  39. func Open(file string) (*Instance, error) {
  40. opts := &opt.Options{
  41. OpenFilesCacheCapacity: 100,
  42. WriteBuffer: 4 << 20,
  43. }
  44. db, err := leveldb.OpenFile(file, opts)
  45. if leveldbIsCorrupted(err) {
  46. db, err = leveldb.RecoverFile(file, opts)
  47. }
  48. if leveldbIsCorrupted(err) {
  49. // The database is corrupted, and we've tried to recover it but it
  50. // didn't work. At this point there isn't much to do beyond dropping
  51. // the database and reindexing...
  52. l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
  53. if err := os.RemoveAll(file); err != nil {
  54. return nil, err
  55. }
  56. db, err = leveldb.OpenFile(file, opts)
  57. }
  58. if err != nil {
  59. return nil, err
  60. }
  61. return newDBInstance(db, file), nil
  62. }
  63. func OpenMemory() *Instance {
  64. db, _ := leveldb.Open(storage.NewMemStorage(), nil)
  65. return newDBInstance(db, "<memory>")
  66. }
  67. func newDBInstance(db *leveldb.DB, location string) *Instance {
  68. i := &Instance{
  69. DB: db,
  70. location: location,
  71. }
  72. i.folderIdx = newSmallIndex(i, []byte{KeyTypeFolderIdx})
  73. i.deviceIdx = newSmallIndex(i, []byte{KeyTypeDeviceIdx})
  74. return i
  75. }
  76. // UpdateSchema does transitions to the current db version if necessary
  77. func (db *Instance) UpdateSchema() {
  78. miscDB := NewNamespacedKV(db, string(KeyTypeMiscData))
  79. prevVersion, _ := miscDB.Int64("dbVersion")
  80. if prevVersion >= dbVersion {
  81. return
  82. }
  83. l.Infof("Updating database schema version from %v to %v...", prevVersion, dbVersion)
  84. if prevVersion == 0 {
  85. db.updateSchema0to1()
  86. }
  87. if prevVersion <= 1 {
  88. db.updateSchema1to2()
  89. }
  90. if prevVersion <= 2 {
  91. db.updateSchema2to3()
  92. }
  93. l.Infof("Finished updating database schema version from %v to %v", prevVersion, dbVersion)
  94. miscDB.PutInt64("dbVersion", dbVersion)
  95. }
  96. // Committed returns the number of items committed to the database since startup
  97. func (db *Instance) Committed() int64 {
  98. return atomic.LoadInt64(&db.committed)
  99. }
  100. // Location returns the filesystem path where the database is stored
  101. func (db *Instance) Location() string {
  102. return db.location
  103. }
  104. func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) {
  105. t := db.newReadWriteTransaction()
  106. defer t.close()
  107. var fk []byte
  108. var gk []byte
  109. for _, f := range fs {
  110. name := []byte(f.Name)
  111. fk = db.deviceKeyInto(fk, folder, device, name)
  112. // Get and unmarshal the file entry. If it doesn't exist or can't be
  113. // unmarshalled we'll add it as a new entry.
  114. bs, err := t.Get(fk, nil)
  115. var ef FileInfoTruncated
  116. if err == nil {
  117. err = ef.Unmarshal(bs)
  118. }
  119. // The Invalid flag might change without the version being bumped.
  120. if err == nil && ef.Version.Equal(f.Version) && ef.Invalid == f.Invalid {
  121. continue
  122. }
  123. devID := protocol.DeviceIDFromBytes(device)
  124. if err == nil {
  125. meta.removeFile(devID, ef)
  126. }
  127. meta.addFile(devID, f)
  128. t.insertFile(fk, folder, device, f)
  129. gk = db.globalKeyInto(gk, folder, name)
  130. t.updateGlobal(gk, folder, device, f, meta)
  131. // Write out and reuse the batch every few records, to avoid the batch
  132. // growing too large and thus allocating unnecessarily much memory.
  133. t.checkFlush()
  134. }
  135. }
  136. func (db *Instance) addSequences(folder []byte, fs []protocol.FileInfo) {
  137. t := db.newReadWriteTransaction()
  138. defer t.close()
  139. var sk []byte
  140. var dk []byte
  141. for _, f := range fs {
  142. sk = db.sequenceKeyInto(sk, folder, f.Sequence)
  143. dk = db.deviceKeyInto(dk, folder, protocol.LocalDeviceID[:], []byte(f.Name))
  144. t.Put(sk, dk)
  145. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  146. t.checkFlush()
  147. }
  148. }
  149. func (db *Instance) removeSequences(folder []byte, fs []protocol.FileInfo) {
  150. t := db.newReadWriteTransaction()
  151. defer t.close()
  152. var sk []byte
  153. for _, f := range fs {
  154. t.Delete(db.sequenceKeyInto(sk, folder, f.Sequence))
  155. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  156. t.checkFlush()
  157. }
  158. }
  159. func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
  160. if len(prefix) > 0 {
  161. unslashedPrefix := prefix
  162. if bytes.HasSuffix(prefix, []byte{'/'}) {
  163. unslashedPrefix = unslashedPrefix[:len(unslashedPrefix)-1]
  164. } else {
  165. prefix = append(prefix, '/')
  166. }
  167. if f, ok := db.getFileTrunc(db.deviceKey(folder, device, unslashedPrefix), true); ok && !fn(f) {
  168. return
  169. }
  170. }
  171. t := db.newReadOnlyTransaction()
  172. defer t.close()
  173. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, prefix)[:keyPrefixLen+keyFolderLen+keyDeviceLen+len(prefix)]), nil)
  174. defer dbi.Release()
  175. for dbi.Next() {
  176. name := db.deviceKeyName(dbi.Key())
  177. if len(prefix) > 0 && !bytes.HasPrefix(name, prefix) {
  178. return
  179. }
  180. // The iterator function may keep a reference to the unmarshalled
  181. // struct, which in turn references the buffer it was unmarshalled
  182. // from. dbi.Value() just returns an internal slice that it reuses, so
  183. // we need to copy it.
  184. f, err := unmarshalTrunc(append([]byte{}, dbi.Value()...), truncate)
  185. if err != nil {
  186. l.Debugln("unmarshal error:", err)
  187. continue
  188. }
  189. if !fn(f) {
  190. return
  191. }
  192. }
  193. }
  194. func (db *Instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator) {
  195. t := db.newReadOnlyTransaction()
  196. defer t.close()
  197. dbi := t.NewIterator(&util.Range{Start: db.sequenceKey(folder, startSeq), Limit: db.sequenceKey(folder, maxInt64)}, nil)
  198. defer dbi.Release()
  199. for dbi.Next() {
  200. f, ok := db.getFile(dbi.Value())
  201. if !ok {
  202. l.Debugln("missing file for sequence number", db.sequenceKeySequence(dbi.Key()))
  203. continue
  204. }
  205. if !fn(f) {
  206. return
  207. }
  208. }
  209. }
  210. func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
  211. t := db.newReadWriteTransaction()
  212. defer t.close()
  213. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, nil, nil)[:keyPrefixLen+keyFolderLen]), nil)
  214. defer dbi.Release()
  215. var gk []byte
  216. for dbi.Next() {
  217. device := db.deviceKeyDevice(dbi.Key())
  218. var f FileInfoTruncated
  219. // The iterator function may keep a reference to the unmarshalled
  220. // struct, which in turn references the buffer it was unmarshalled
  221. // from. dbi.Value() just returns an internal slice that it reuses, so
  222. // we need to copy it.
  223. err := f.Unmarshal(append([]byte{}, dbi.Value()...))
  224. if err != nil {
  225. l.Debugln("unmarshal error:", err)
  226. continue
  227. }
  228. switch f.Name {
  229. case "", ".", "..", "/": // A few obviously invalid filenames
  230. l.Infof("Dropping invalid filename %q from database", f.Name)
  231. name := []byte(f.Name)
  232. gk = db.globalKeyInto(gk, folder, name)
  233. t.removeFromGlobal(gk, folder, device, name, nil)
  234. t.Delete(dbi.Key())
  235. t.checkFlush()
  236. continue
  237. }
  238. if !fn(device, f) {
  239. return
  240. }
  241. }
  242. }
  243. func (db *Instance) getFile(key []byte) (protocol.FileInfo, bool) {
  244. if f, ok := db.getFileTrunc(key, false); ok {
  245. return f.(protocol.FileInfo), true
  246. }
  247. return protocol.FileInfo{}, false
  248. }
  249. func (db *Instance) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) {
  250. bs, err := db.Get(key, nil)
  251. if err == leveldb.ErrNotFound {
  252. return nil, false
  253. }
  254. if err != nil {
  255. l.Debugln("surprise error:", err)
  256. return nil, false
  257. }
  258. f, err := unmarshalTrunc(bs, trunc)
  259. if err != nil {
  260. l.Debugln("unmarshal error:", err)
  261. return nil, false
  262. }
  263. return f, true
  264. }
  265. func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
  266. t := db.newReadOnlyTransaction()
  267. defer t.close()
  268. _, _, f, ok := db.getGlobalInto(t, nil, nil, folder, file, truncate)
  269. return f, ok
  270. }
  271. func (db *Instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file []byte, truncate bool) ([]byte, []byte, FileIntf, bool) {
  272. gk = db.globalKeyInto(gk, folder, file)
  273. bs, err := t.Get(gk, nil)
  274. if err != nil {
  275. return gk, dk, nil, false
  276. }
  277. vl, ok := unmarshalVersionList(bs)
  278. if !ok {
  279. return gk, dk, nil, false
  280. }
  281. dk = db.deviceKeyInto(dk, folder, vl.Versions[0].Device, file)
  282. if fi, ok := db.getFileTrunc(dk, truncate); ok {
  283. return gk, dk, fi, true
  284. }
  285. return gk, dk, nil, false
  286. }
  287. func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
  288. if len(prefix) > 0 {
  289. unslashedPrefix := prefix
  290. if bytes.HasSuffix(prefix, []byte{'/'}) {
  291. unslashedPrefix = unslashedPrefix[:len(unslashedPrefix)-1]
  292. } else {
  293. prefix = append(prefix, '/')
  294. }
  295. if f, ok := db.getGlobal(folder, unslashedPrefix, truncate); ok && !fn(f) {
  296. return
  297. }
  298. }
  299. t := db.newReadOnlyTransaction()
  300. defer t.close()
  301. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, prefix)), nil)
  302. defer dbi.Release()
  303. var fk []byte
  304. for dbi.Next() {
  305. name := db.globalKeyName(dbi.Key())
  306. if len(prefix) > 0 && !bytes.HasPrefix(name, prefix) {
  307. return
  308. }
  309. vl, ok := unmarshalVersionList(dbi.Value())
  310. if !ok {
  311. continue
  312. }
  313. fk = db.deviceKeyInto(fk, folder, vl.Versions[0].Device, name)
  314. f, ok := db.getFileTrunc(fk, truncate)
  315. if !ok {
  316. continue
  317. }
  318. if !fn(f) {
  319. return
  320. }
  321. }
  322. }
  323. func (db *Instance) availability(folder, file []byte) []protocol.DeviceID {
  324. k := db.globalKey(folder, file)
  325. bs, err := db.Get(k, nil)
  326. if err == leveldb.ErrNotFound {
  327. return nil
  328. }
  329. if err != nil {
  330. l.Debugln("surprise error:", err)
  331. return nil
  332. }
  333. vl, ok := unmarshalVersionList(bs)
  334. if !ok {
  335. return nil
  336. }
  337. var devices []protocol.DeviceID
  338. for _, v := range vl.Versions {
  339. if !v.Version.Equal(vl.Versions[0].Version) {
  340. break
  341. }
  342. if v.Invalid {
  343. continue
  344. }
  345. n := protocol.DeviceIDFromBytes(v.Device)
  346. devices = append(devices, n)
  347. }
  348. return devices
  349. }
  350. func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
  351. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  352. db.withNeedLocal(folder, truncate, fn)
  353. return
  354. }
  355. t := db.newReadOnlyTransaction()
  356. defer t.close()
  357. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  358. defer dbi.Release()
  359. var fk []byte
  360. for dbi.Next() {
  361. vl, ok := unmarshalVersionList(dbi.Value())
  362. if !ok {
  363. continue
  364. }
  365. haveFV, have := vl.Get(device)
  366. // XXX: This marks Concurrent (i.e. conflicting) changes as
  367. // needs. Maybe we should do that, but it needs special
  368. // handling in the puller.
  369. if have && haveFV.Version.GreaterEqual(vl.Versions[0].Version) {
  370. continue
  371. }
  372. name := db.globalKeyName(dbi.Key())
  373. needVersion := vl.Versions[0].Version
  374. needDevice := protocol.DeviceIDFromBytes(vl.Versions[0].Device)
  375. for i := range vl.Versions {
  376. if !vl.Versions[i].Version.Equal(needVersion) {
  377. // We haven't found a valid copy of the file with the needed version.
  378. break
  379. }
  380. if vl.Versions[i].Invalid {
  381. // The file is marked invalid, don't use it.
  382. continue
  383. }
  384. fk = db.deviceKeyInto(fk, folder, vl.Versions[i].Device, name)
  385. bs, err := t.Get(fk, nil)
  386. if err != nil {
  387. l.Debugln("surprise error:", err)
  388. continue
  389. }
  390. gf, err := unmarshalTrunc(bs, truncate)
  391. if err != nil {
  392. l.Debugln("unmarshal error:", err)
  393. continue
  394. }
  395. if gf.IsDeleted() && !have {
  396. // We don't need deleted files that we don't have
  397. break
  398. }
  399. l.Debugf("need folder=%q device=%v name=%q have=%v invalid=%v haveV=%v globalV=%v globalDev=%v", folder, protocol.DeviceIDFromBytes(device), name, have, haveFV.Invalid, haveFV.Version, needVersion, needDevice)
  400. if !fn(gf) {
  401. return
  402. }
  403. // This file is handled, no need to look further in the version list
  404. break
  405. }
  406. }
  407. }
  408. func (db *Instance) withNeedLocal(folder []byte, truncate bool, fn Iterator) {
  409. t := db.newReadOnlyTransaction()
  410. defer t.close()
  411. dbi := t.NewIterator(util.BytesPrefix(db.needKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  412. defer dbi.Release()
  413. var dk []byte
  414. var gk []byte
  415. var f FileIntf
  416. var ok bool
  417. for dbi.Next() {
  418. gk, dk, f, ok = db.getGlobalInto(t, gk, dk, folder, db.globalKeyName(dbi.Key()), truncate)
  419. if !ok {
  420. continue
  421. }
  422. if !fn(f) {
  423. return
  424. }
  425. }
  426. }
  427. func (db *Instance) ListFolders() []string {
  428. t := db.newReadOnlyTransaction()
  429. defer t.close()
  430. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
  431. defer dbi.Release()
  432. folderExists := make(map[string]bool)
  433. for dbi.Next() {
  434. folder, ok := db.globalKeyFolder(dbi.Key())
  435. if ok && !folderExists[string(folder)] {
  436. folderExists[string(folder)] = true
  437. }
  438. }
  439. folders := make([]string, 0, len(folderExists))
  440. for k := range folderExists {
  441. folders = append(folders, k)
  442. }
  443. sort.Strings(folders)
  444. return folders
  445. }
  446. func (db *Instance) dropFolder(folder []byte) {
  447. t := db.newReadWriteTransaction()
  448. defer t.close()
  449. for _, key := range [][]byte{
  450. // Remove all items related to the given folder from the device->file bucket
  451. db.deviceKey(folder, nil, nil)[:keyPrefixLen+keyFolderLen],
  452. // Remove all sequences related to the folder
  453. db.sequenceKey([]byte(folder), 0)[:keyPrefixLen+keyFolderLen],
  454. // Remove all items related to the given folder from the global bucket
  455. db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen],
  456. // Remove all needs related to the folder
  457. db.needKey(folder, nil)[:keyPrefixLen+keyFolderLen],
  458. } {
  459. t.deleteKeyPrefix(key)
  460. }
  461. }
  462. func (db *Instance) dropDeviceFolder(device, folder []byte, meta *metadataTracker) {
  463. t := db.newReadWriteTransaction()
  464. defer t.close()
  465. dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, nil)), nil)
  466. defer dbi.Release()
  467. var gk []byte
  468. for dbi.Next() {
  469. key := dbi.Key()
  470. name := db.deviceKeyName(key)
  471. gk = db.globalKeyInto(gk, folder, name)
  472. t.removeFromGlobal(gk, folder, device, name, meta)
  473. t.Delete(key)
  474. t.checkFlush()
  475. }
  476. }
  477. func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) {
  478. t := db.newReadWriteTransaction()
  479. defer t.close()
  480. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, nil)[:keyPrefixLen+keyFolderLen]), nil)
  481. defer dbi.Release()
  482. var fk []byte
  483. for dbi.Next() {
  484. vl, ok := unmarshalVersionList(dbi.Value())
  485. if !ok {
  486. continue
  487. }
  488. // Check the global version list for consistency. An issue in previous
  489. // versions of goleveldb could result in reordered writes so that
  490. // there are global entries pointing to no longer existing files. Here
  491. // we find those and clear them out.
  492. name := db.globalKeyName(dbi.Key())
  493. var newVL VersionList
  494. for i, version := range vl.Versions {
  495. fk = db.deviceKeyInto(fk, folder, version.Device, name)
  496. _, err := t.Get(fk, nil)
  497. if err == leveldb.ErrNotFound {
  498. continue
  499. }
  500. if err != nil {
  501. l.Debugln("surprise error:", err)
  502. return
  503. }
  504. newVL.Versions = append(newVL.Versions, version)
  505. if i == 0 {
  506. if fi, ok := db.getFile(fk); ok {
  507. meta.addFile(globalDeviceID, fi)
  508. }
  509. }
  510. }
  511. if len(newVL.Versions) != len(vl.Versions) {
  512. t.Put(dbi.Key(), mustMarshal(&newVL))
  513. t.checkFlush()
  514. }
  515. }
  516. l.Debugf("db check completed for %q", folder)
  517. }
  518. func (db *Instance) updateSchema0to1() {
  519. t := db.newReadWriteTransaction()
  520. defer t.close()
  521. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeDevice}), nil)
  522. defer dbi.Release()
  523. symlinkConv := 0
  524. changedFolders := make(map[string]struct{})
  525. ignAdded := 0
  526. meta := newMetadataTracker() // dummy metadata tracker
  527. var gk []byte
  528. for dbi.Next() {
  529. folder := db.deviceKeyFolder(dbi.Key())
  530. device := db.deviceKeyDevice(dbi.Key())
  531. name := db.deviceKeyName(dbi.Key())
  532. // Remove files with absolute path (see #4799)
  533. if strings.HasPrefix(string(name), "/") {
  534. if _, ok := changedFolders[string(folder)]; !ok {
  535. changedFolders[string(folder)] = struct{}{}
  536. }
  537. gk = db.globalKeyInto(gk, folder, name)
  538. t.removeFromGlobal(gk, folder, device, nil, nil)
  539. t.Delete(dbi.Key())
  540. t.checkFlush()
  541. continue
  542. }
  543. // Change SYMLINK_FILE and SYMLINK_DIRECTORY types to the current SYMLINK
  544. // type (previously SYMLINK_UNKNOWN). It does this for all devices, both
  545. // local and remote, and does not reset delta indexes. It shouldn't really
  546. // matter what the symlink type is, but this cleans it up for a possible
  547. // future when SYMLINK_FILE and SYMLINK_DIRECTORY are no longer understood.
  548. var f protocol.FileInfo
  549. if err := f.Unmarshal(dbi.Value()); err != nil {
  550. // probably can't happen
  551. continue
  552. }
  553. if f.Type == protocol.FileInfoTypeDeprecatedSymlinkDirectory || f.Type == protocol.FileInfoTypeDeprecatedSymlinkFile {
  554. f.Type = protocol.FileInfoTypeSymlink
  555. bs, err := f.Marshal()
  556. if err != nil {
  557. panic("can't happen: " + err.Error())
  558. }
  559. t.Put(dbi.Key(), bs)
  560. t.checkFlush()
  561. symlinkConv++
  562. }
  563. // Add invalid files to global list
  564. if f.Invalid {
  565. gk = db.globalKeyInto(gk, folder, name)
  566. if t.updateGlobal(gk, folder, device, f, meta) {
  567. if _, ok := changedFolders[string(folder)]; !ok {
  568. changedFolders[string(folder)] = struct{}{}
  569. }
  570. ignAdded++
  571. }
  572. }
  573. }
  574. for folder := range changedFolders {
  575. db.dropFolderMeta([]byte(folder))
  576. }
  577. l.Infof("Updated symlink type for %d index entries and added %d invalid files to global list", symlinkConv, ignAdded)
  578. }
  579. // updateSchema1to2 introduces a sequenceKey->deviceKey bucket for local items
  580. // to allow iteration in sequence order (simplifies sending indexes).
  581. func (db *Instance) updateSchema1to2() {
  582. t := db.newReadWriteTransaction()
  583. defer t.close()
  584. var sk []byte
  585. var dk []byte
  586. for _, folderStr := range db.ListFolders() {
  587. folder := []byte(folderStr)
  588. db.withHave(folder, protocol.LocalDeviceID[:], nil, true, func(f FileIntf) bool {
  589. sk = db.sequenceKeyInto(sk, folder, f.SequenceNo())
  590. dk = db.deviceKeyInto(dk, folder, protocol.LocalDeviceID[:], []byte(f.FileName()))
  591. t.Put(sk, dk)
  592. t.checkFlush()
  593. return true
  594. })
  595. }
  596. }
  597. // updateSchema2to3 introduces a needKey->nil bucket for locally needed files.
  598. func (db *Instance) updateSchema2to3() {
  599. t := db.newReadWriteTransaction()
  600. defer t.close()
  601. var nk []byte
  602. var dk []byte
  603. for _, folderStr := range db.ListFolders() {
  604. folder := []byte(folderStr)
  605. db.withGlobal(folder, nil, true, func(f FileIntf) bool {
  606. name := []byte(f.FileName())
  607. dk = db.deviceKeyInto(dk, folder, protocol.LocalDeviceID[:], name)
  608. var v protocol.Vector
  609. haveFile, ok := db.getFileTrunc(dk, true)
  610. if ok {
  611. v = haveFile.FileVersion()
  612. }
  613. if !need(f, ok, v) {
  614. return true
  615. }
  616. nk = t.db.needKeyInto(nk, folder, []byte(f.FileName()))
  617. t.Put(nk, nil)
  618. t.checkFlush()
  619. return true
  620. })
  621. }
  622. }
  623. // deviceKey returns a byte slice encoding the following information:
  624. // keyTypeDevice (1 byte)
  625. // folder (4 bytes)
  626. // device (4 bytes)
  627. // name (variable size)
  628. func (db *Instance) deviceKey(folder, device, file []byte) []byte {
  629. return db.deviceKeyInto(nil, folder, device, file)
  630. }
  631. func (db *Instance) deviceKeyInto(k, folder, device, file []byte) []byte {
  632. reqLen := keyPrefixLen + keyFolderLen + keyDeviceLen + len(file)
  633. k = resize(k, reqLen)
  634. k[0] = KeyTypeDevice
  635. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
  636. binary.BigEndian.PutUint32(k[keyPrefixLen+keyFolderLen:], db.deviceIdx.ID(device))
  637. copy(k[keyPrefixLen+keyFolderLen+keyDeviceLen:], file)
  638. return k
  639. }
  640. // deviceKeyName returns the device ID from the key
  641. func (db *Instance) deviceKeyName(key []byte) []byte {
  642. return key[keyPrefixLen+keyFolderLen+keyDeviceLen:]
  643. }
  644. // deviceKeyFolder returns the folder name from the key
  645. func (db *Instance) deviceKeyFolder(key []byte) []byte {
  646. folder, ok := db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  647. if !ok {
  648. panic("bug: lookup of nonexistent folder ID")
  649. }
  650. return folder
  651. }
  652. // deviceKeyDevice returns the device ID from the key
  653. func (db *Instance) deviceKeyDevice(key []byte) []byte {
  654. device, ok := db.deviceIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen+keyFolderLen:]))
  655. if !ok {
  656. panic("bug: lookup of nonexistent device ID")
  657. }
  658. return device
  659. }
  660. // globalKey returns a byte slice encoding the following information:
  661. // keyTypeGlobal (1 byte)
  662. // folder (4 bytes)
  663. // name (variable size)
  664. func (db *Instance) globalKey(folder, file []byte) []byte {
  665. return db.globalKeyInto(nil, folder, file)
  666. }
  667. func (db *Instance) globalKeyInto(gk, folder, file []byte) []byte {
  668. reqLen := keyPrefixLen + keyFolderLen + len(file)
  669. gk = resize(gk, reqLen)
  670. gk[0] = KeyTypeGlobal
  671. binary.BigEndian.PutUint32(gk[keyPrefixLen:], db.folderIdx.ID(folder))
  672. copy(gk[keyPrefixLen+keyFolderLen:], file)
  673. return gk
  674. }
  675. // globalKeyName returns the filename from the key
  676. func (db *Instance) globalKeyName(key []byte) []byte {
  677. return key[keyPrefixLen+keyFolderLen:]
  678. }
  679. // globalKeyFolder returns the folder name from the key
  680. func (db *Instance) globalKeyFolder(key []byte) ([]byte, bool) {
  681. return db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  682. }
  683. // needKey is a globalKey with a different prefix
  684. func (db *Instance) needKey(folder, file []byte) []byte {
  685. return db.needKeyInto(nil, folder, file)
  686. }
  687. func (db *Instance) needKeyInto(k, folder, file []byte) []byte {
  688. k = db.globalKeyInto(k, folder, file)
  689. k[0] = KeyTypeNeed
  690. return k
  691. }
  692. // sequenceKey returns a byte slice encoding the following information:
  693. // KeyTypeSequence (1 byte)
  694. // folder (4 bytes)
  695. // sequence number (8 bytes)
  696. func (db *Instance) sequenceKey(folder []byte, seq int64) []byte {
  697. return db.sequenceKeyInto(nil, folder, seq)
  698. }
  699. func (db *Instance) sequenceKeyInto(k []byte, folder []byte, seq int64) []byte {
  700. reqLen := keyPrefixLen + keyFolderLen + keySequenceLen
  701. k = resize(k, reqLen)
  702. k[0] = KeyTypeSequence
  703. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
  704. binary.BigEndian.PutUint64(k[keyPrefixLen+keyFolderLen:], uint64(seq))
  705. return k
  706. }
  707. // sequenceKeySequence returns the sequence number from the key
  708. func (db *Instance) sequenceKeySequence(key []byte) int64 {
  709. return int64(binary.BigEndian.Uint64(key[keyPrefixLen+keyFolderLen:]))
  710. }
  711. func (db *Instance) getIndexID(device, folder []byte) protocol.IndexID {
  712. key := db.indexIDKey(device, folder)
  713. cur, err := db.Get(key, nil)
  714. if err != nil {
  715. return 0
  716. }
  717. var id protocol.IndexID
  718. if err := id.Unmarshal(cur); err != nil {
  719. return 0
  720. }
  721. return id
  722. }
  723. func (db *Instance) setIndexID(device, folder []byte, id protocol.IndexID) {
  724. key := db.indexIDKey(device, folder)
  725. bs, _ := id.Marshal() // marshalling can't fail
  726. if err := db.Put(key, bs, nil); err != nil {
  727. panic("storing index ID: " + err.Error())
  728. }
  729. }
  730. func (db *Instance) indexIDKey(device, folder []byte) []byte {
  731. k := make([]byte, keyPrefixLen+keyDeviceLen+keyFolderLen)
  732. k[0] = KeyTypeIndexID
  733. binary.BigEndian.PutUint32(k[keyPrefixLen:], db.deviceIdx.ID(device))
  734. binary.BigEndian.PutUint32(k[keyPrefixLen+keyDeviceLen:], db.folderIdx.ID(folder))
  735. return k
  736. }
  737. func (db *Instance) indexIDDevice(key []byte) []byte {
  738. device, ok := db.deviceIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
  739. if !ok {
  740. // uuh ...
  741. return nil
  742. }
  743. return device
  744. }
  745. func (db *Instance) mtimesKey(folder []byte) []byte {
  746. prefix := make([]byte, 5) // key type + 4 bytes folder idx number
  747. prefix[0] = KeyTypeVirtualMtime
  748. binary.BigEndian.PutUint32(prefix[1:], db.folderIdx.ID(folder))
  749. return prefix
  750. }
  751. func (db *Instance) folderMetaKey(folder []byte) []byte {
  752. prefix := make([]byte, 5) // key type + 4 bytes folder idx number
  753. prefix[0] = KeyTypeFolderMeta
  754. binary.BigEndian.PutUint32(prefix[1:], db.folderIdx.ID(folder))
  755. return prefix
  756. }
  757. // DropLocalDeltaIndexIDs removes all index IDs for the local device ID from
  758. // the database. This will cause a full index transmission on the next
  759. // connection.
  760. func (db *Instance) DropLocalDeltaIndexIDs() {
  761. db.dropDeltaIndexIDs(true)
  762. }
  763. // DropRemoteDeltaIndexIDs removes all index IDs for the other devices than
  764. // the local one from the database. This will cause them to send us a full
  765. // index on the next connection.
  766. func (db *Instance) DropRemoteDeltaIndexIDs() {
  767. db.dropDeltaIndexIDs(false)
  768. }
  769. func (db *Instance) dropDeltaIndexIDs(local bool) {
  770. t := db.newReadWriteTransaction()
  771. defer t.close()
  772. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeIndexID}), nil)
  773. defer dbi.Release()
  774. for dbi.Next() {
  775. device := db.indexIDDevice(dbi.Key())
  776. if bytes.Equal(device, protocol.LocalDeviceID[:]) == local {
  777. t.Delete(dbi.Key())
  778. }
  779. }
  780. }
  781. func (db *Instance) dropMtimes(folder []byte) {
  782. db.dropPrefix(db.mtimesKey(folder))
  783. }
  784. func (db *Instance) dropFolderMeta(folder []byte) {
  785. db.dropPrefix(db.folderMetaKey(folder))
  786. }
  787. func (db *Instance) dropPrefix(prefix []byte) {
  788. t := db.newReadWriteTransaction()
  789. defer t.close()
  790. dbi := t.NewIterator(util.BytesPrefix(prefix), nil)
  791. defer dbi.Release()
  792. for dbi.Next() {
  793. t.Delete(dbi.Key())
  794. }
  795. }
  796. func unmarshalTrunc(bs []byte, truncate bool) (FileIntf, error) {
  797. if truncate {
  798. var tf FileInfoTruncated
  799. err := tf.Unmarshal(bs)
  800. return tf, err
  801. }
  802. var tf protocol.FileInfo
  803. err := tf.Unmarshal(bs)
  804. return tf, err
  805. }
  806. func unmarshalVersionList(data []byte) (VersionList, bool) {
  807. var vl VersionList
  808. if err := vl.Unmarshal(data); err != nil {
  809. l.Debugln("unmarshal error:", err)
  810. return VersionList{}, false
  811. }
  812. if len(vl.Versions) == 0 {
  813. l.Debugln("empty version list")
  814. return VersionList{}, false
  815. }
  816. return vl, true
  817. }
  818. // A "better" version of leveldb's errors.IsCorrupted.
  819. func leveldbIsCorrupted(err error) bool {
  820. switch {
  821. case err == nil:
  822. return false
  823. case errors.IsCorrupted(err):
  824. return true
  825. case strings.Contains(err.Error(), "corrupted"):
  826. return true
  827. }
  828. return false
  829. }
  830. // A smallIndex is an in memory bidirectional []byte to uint32 map. It gives
  831. // fast lookups in both directions and persists to the database. Don't use for
  832. // storing more items than fit comfortably in RAM.
  833. type smallIndex struct {
  834. db *Instance
  835. prefix []byte
  836. id2val map[uint32]string
  837. val2id map[string]uint32
  838. nextID uint32
  839. mut sync.Mutex
  840. }
  841. func newSmallIndex(db *Instance, prefix []byte) *smallIndex {
  842. idx := &smallIndex{
  843. db: db,
  844. prefix: prefix,
  845. id2val: make(map[uint32]string),
  846. val2id: make(map[string]uint32),
  847. mut: sync.NewMutex(),
  848. }
  849. idx.load()
  850. return idx
  851. }
  852. // load iterates over the prefix space in the database and populates the in
  853. // memory maps.
  854. func (i *smallIndex) load() {
  855. tr := i.db.newReadOnlyTransaction()
  856. it := tr.NewIterator(util.BytesPrefix(i.prefix), nil)
  857. for it.Next() {
  858. val := string(it.Value())
  859. id := binary.BigEndian.Uint32(it.Key()[len(i.prefix):])
  860. i.id2val[id] = val
  861. i.val2id[val] = id
  862. if id >= i.nextID {
  863. i.nextID = id + 1
  864. }
  865. }
  866. it.Release()
  867. tr.close()
  868. }
  869. // ID returns the index number for the given byte slice, allocating a new one
  870. // and persisting this to the database if necessary.
  871. func (i *smallIndex) ID(val []byte) uint32 {
  872. i.mut.Lock()
  873. // intentionally avoiding defer here as we want this call to be as fast as
  874. // possible in the general case (folder ID already exists). The map lookup
  875. // with the conversion of []byte to string is compiler optimized to not
  876. // copy the []byte, which is why we don't assign it to a temp variable
  877. // here.
  878. if id, ok := i.val2id[string(val)]; ok {
  879. i.mut.Unlock()
  880. return id
  881. }
  882. id := i.nextID
  883. i.nextID++
  884. valStr := string(val)
  885. i.val2id[valStr] = id
  886. i.id2val[id] = valStr
  887. key := make([]byte, len(i.prefix)+8) // prefix plus uint32 id
  888. copy(key, i.prefix)
  889. binary.BigEndian.PutUint32(key[len(i.prefix):], id)
  890. i.db.Put(key, val, nil)
  891. i.mut.Unlock()
  892. return id
  893. }
  894. // Val returns the value for the given index number, or (nil, false) if there
  895. // is no such index number.
  896. func (i *smallIndex) Val(id uint32) ([]byte, bool) {
  897. i.mut.Lock()
  898. val, ok := i.id2val[id]
  899. i.mut.Unlock()
  900. if !ok {
  901. return nil, false
  902. }
  903. return []byte(val), true
  904. }
  905. // resize returns a byte array of length reqLen, reusing k if possible
  906. func resize(k []byte, reqLen int) []byte {
  907. if cap(k) < reqLen {
  908. return make([]byte, reqLen)
  909. }
  910. return k[:reqLen]
  911. }