leveldb.go 16 KB


  1. package files
  2. import (
  3. "bytes"
  4. "runtime"
  5. "sort"
  6. "sync"
  7. "github.com/syncthing/syncthing/lamport"
  8. "github.com/syncthing/syncthing/protocol"
  9. "github.com/syndtr/goleveldb/leveldb"
  10. "github.com/syndtr/goleveldb/leveldb/iterator"
  11. "github.com/syndtr/goleveldb/leveldb/opt"
  12. "github.com/syndtr/goleveldb/leveldb/util"
  13. )
  14. var (
  15. clockTick uint64
  16. clockMut sync.Mutex
  17. )
  18. func clock(v uint64) uint64 {
  19. clockMut.Lock()
  20. defer clockMut.Unlock()
  21. if v > clockTick {
  22. clockTick = v + 1
  23. } else {
  24. clockTick++
  25. }
  26. return clockTick
  27. }
  28. const (
  29. keyTypeNode = iota
  30. keyTypeGlobal
  31. )
  32. type fileVersion struct {
  33. version uint64
  34. node []byte
  35. }
  36. type versionList struct {
  37. versions []fileVersion
  38. }
  39. type fileList []protocol.FileInfo
  40. func (l fileList) Len() int {
  41. return len(l)
  42. }
  43. func (l fileList) Swap(a, b int) {
  44. l[a], l[b] = l[b], l[a]
  45. }
  46. func (l fileList) Less(a, b int) bool {
  47. return l[a].Name < l[b].Name
  48. }
  49. type dbReader interface {
  50. Get([]byte, *opt.ReadOptions) ([]byte, error)
  51. }
  52. type dbWriter interface {
  53. Put([]byte, []byte)
  54. Delete([]byte)
  55. }
  56. /*
  57. keyTypeNode (1 byte)
  58. repository (64 bytes)
  59. node (32 bytes)
  60. name (variable size)
  61. |
  62. scanner.File
  63. keyTypeGlobal (1 byte)
  64. repository (64 bytes)
  65. name (variable size)
  66. |
  67. []fileVersion (sorted)
  68. */
  69. func nodeKey(repo, node, file []byte) []byte {
  70. k := make([]byte, 1+64+32+len(file))
  71. k[0] = keyTypeNode
  72. copy(k[1:], []byte(repo))
  73. copy(k[1+64:], node[:])
  74. copy(k[1+64+32:], []byte(file))
  75. return k
  76. }
  77. func globalKey(repo, file []byte) []byte {
  78. k := make([]byte, 1+64+len(file))
  79. k[0] = keyTypeGlobal
  80. copy(k[1:], []byte(repo))
  81. copy(k[1+64:], []byte(file))
  82. return k
  83. }
  84. func nodeKeyName(key []byte) []byte {
  85. return key[1+64+32:]
  86. }
  87. func nodeKeyRepo(key []byte) []byte {
  88. repo := key[1 : 1+64]
  89. izero := bytes.IndexByte(repo, 0)
  90. return repo[:izero]
  91. }
  92. func nodeKeyNode(key []byte) []byte {
  93. return key[1+64 : 1+64+32]
  94. }
  95. func globalKeyName(key []byte) []byte {
  96. return key[1+64:]
  97. }
  98. func globalKeyRepo(key []byte) []byte {
  99. repo := key[1 : 1+64]
  100. izero := bytes.IndexByte(repo, 0)
  101. return repo[:izero]
  102. }
  103. type deletionHandler func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64
  104. type fileIterator func(f protocol.FileIntf) bool
  105. func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo, deleteFn deletionHandler) uint64 {
  106. defer runtime.GC()
  107. sort.Sort(fileList(fs)) // sort list on name, same as on disk
  108. start := nodeKey(repo, node, nil) // before all repo/node files
  109. limit := nodeKey(repo, node, []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
  110. batch := new(leveldb.Batch)
  111. snap, err := db.GetSnapshot()
  112. if err != nil {
  113. panic(err)
  114. }
  115. defer snap.Release()
  116. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  117. defer dbi.Release()
  118. moreDb := dbi.Next()
  119. fsi := 0
  120. var maxLocalVer uint64
  121. for {
  122. var newName, oldName []byte
  123. moreFs := fsi < len(fs)
  124. if !moreDb && !moreFs {
  125. break
  126. }
  127. if !moreFs && deleteFn == nil {
  128. // We don't have any more updated files to process and deletion
  129. // has not been requested, so we can exit early
  130. break
  131. }
  132. if moreFs {
  133. newName = []byte(fs[fsi].Name)
  134. }
  135. if moreDb {
  136. oldName = nodeKeyName(dbi.Key())
  137. }
  138. cmp := bytes.Compare(newName, oldName)
  139. if debug {
  140. l.Debugf("generic replace; repo=%q node=%v moreFs=%v moreDb=%v cmp=%d newName=%q oldName=%q", repo, protocol.NodeIDFromBytes(node), moreFs, moreDb, cmp, newName, oldName)
  141. }
  142. switch {
  143. case moreFs && (!moreDb || cmp == -1):
  144. // Disk is missing this file. Insert it.
  145. if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer {
  146. maxLocalVer = lv
  147. }
  148. ldbUpdateGlobal(snap, batch, repo, node, newName, fs[fsi].Version)
  149. fsi++
  150. case moreFs && moreDb && cmp == 0:
  151. // File exists on both sides - compare versions.
  152. var ef protocol.FileInfoTruncated
  153. ef.UnmarshalXDR(dbi.Value())
  154. if fs[fsi].Version > ef.Version {
  155. if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer {
  156. maxLocalVer = lv
  157. }
  158. ldbUpdateGlobal(snap, batch, repo, node, newName, fs[fsi].Version)
  159. }
  160. // Iterate both sides.
  161. fsi++
  162. moreDb = dbi.Next()
  163. case moreDb && (!moreFs || cmp == 1):
  164. if deleteFn != nil {
  165. if lv := deleteFn(snap, batch, repo, node, oldName, dbi); lv > maxLocalVer {
  166. maxLocalVer = lv
  167. }
  168. }
  169. moreDb = dbi.Next()
  170. }
  171. }
  172. err = db.Write(batch, nil)
  173. if err != nil {
  174. panic(err)
  175. }
  176. return maxLocalVer
  177. }
  178. func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
  179. // TODO: Return the remaining maxLocalVer?
  180. return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 {
  181. // Disk has files that we are missing. Remove it.
  182. if debug {
  183. l.Debugf("delete; repo=%q node=%v name=%q", repo, protocol.NodeIDFromBytes(node), name)
  184. }
  185. ldbRemoveFromGlobal(db, batch, repo, node, name)
  186. batch.Delete(dbi.Key())
  187. return 0
  188. })
  189. }
  190. func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
  191. return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 {
  192. var tf protocol.FileInfoTruncated
  193. err := tf.UnmarshalXDR(dbi.Value())
  194. if err != nil {
  195. panic(err)
  196. }
  197. if !tf.IsDeleted() {
  198. if debug {
  199. l.Debugf("mark deleted; repo=%q node=%v name=%q", repo, protocol.NodeIDFromBytes(node), name)
  200. }
  201. ts := clock(tf.LocalVersion)
  202. f := protocol.FileInfo{
  203. Name: tf.Name,
  204. Version: lamport.Default.Tick(tf.Version),
  205. LocalVersion: ts,
  206. Flags: tf.Flags | protocol.FlagDeleted,
  207. Modified: tf.Modified,
  208. }
  209. batch.Put(dbi.Key(), f.MarshalXDR())
  210. ldbUpdateGlobal(db, batch, repo, node, nodeKeyName(dbi.Key()), f.Version)
  211. return ts
  212. }
  213. return 0
  214. })
  215. }
  216. func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
  217. defer runtime.GC()
  218. batch := new(leveldb.Batch)
  219. snap, err := db.GetSnapshot()
  220. if err != nil {
  221. panic(err)
  222. }
  223. defer snap.Release()
  224. var maxLocalVer uint64
  225. for _, f := range fs {
  226. name := []byte(f.Name)
  227. fk := nodeKey(repo, node, name)
  228. bs, err := snap.Get(fk, nil)
  229. if err == leveldb.ErrNotFound {
  230. if lv := ldbInsert(batch, repo, node, name, f); lv > maxLocalVer {
  231. maxLocalVer = lv
  232. }
  233. ldbUpdateGlobal(snap, batch, repo, node, name, f.Version)
  234. continue
  235. }
  236. var ef protocol.FileInfoTruncated
  237. err = ef.UnmarshalXDR(bs)
  238. if err != nil {
  239. panic(err)
  240. }
  241. if ef.Version != f.Version {
  242. if lv := ldbInsert(batch, repo, node, name, f); lv > maxLocalVer {
  243. maxLocalVer = lv
  244. }
  245. ldbUpdateGlobal(snap, batch, repo, node, name, f.Version)
  246. }
  247. }
  248. err = db.Write(batch, nil)
  249. if err != nil {
  250. panic(err)
  251. }
  252. return maxLocalVer
  253. }
  254. func ldbInsert(batch dbWriter, repo, node, name []byte, file protocol.FileInfo) uint64 {
  255. if debug {
  256. l.Debugf("insert; repo=%q node=%v %v", repo, protocol.NodeIDFromBytes(node), file)
  257. }
  258. if file.LocalVersion == 0 {
  259. file.LocalVersion = clock(0)
  260. }
  261. nk := nodeKey(repo, node, name)
  262. batch.Put(nk, file.MarshalXDR())
  263. return file.LocalVersion
  264. }
  265. // ldbUpdateGlobal adds this node+version to the version list for the given
  266. // file. If the node is already present in the list, the version is updated.
  267. // If the file does not have an entry in the global list, it is created.
  268. func ldbUpdateGlobal(db dbReader, batch dbWriter, repo, node, file []byte, version uint64) bool {
  269. if debug {
  270. l.Debugf("update global; repo=%q node=%v file=%q version=%d", repo, protocol.NodeIDFromBytes(node), file, version)
  271. }
  272. gk := globalKey(repo, file)
  273. svl, err := db.Get(gk, nil)
  274. if err != nil && err != leveldb.ErrNotFound {
  275. panic(err)
  276. }
  277. var fl versionList
  278. nv := fileVersion{
  279. node: node,
  280. version: version,
  281. }
  282. if svl != nil {
  283. err = fl.UnmarshalXDR(svl)
  284. if err != nil {
  285. panic(err)
  286. }
  287. for i := range fl.versions {
  288. if bytes.Compare(fl.versions[i].node, node) == 0 {
  289. if fl.versions[i].version == version {
  290. // No need to do anything
  291. return false
  292. }
  293. fl.versions = append(fl.versions[:i], fl.versions[i+1:]...)
  294. break
  295. }
  296. }
  297. }
  298. for i := range fl.versions {
  299. if fl.versions[i].version <= version {
  300. t := append(fl.versions, fileVersion{})
  301. copy(t[i+1:], t[i:])
  302. t[i] = nv
  303. fl.versions = t
  304. goto done
  305. }
  306. }
  307. fl.versions = append(fl.versions, nv)
  308. done:
  309. batch.Put(gk, fl.MarshalXDR())
  310. return true
  311. }
  312. // ldbRemoveFromGlobal removes the node from the global version list for the
  313. // given file. If the version list is empty after this, the file entry is
  314. // removed entirely.
  315. func ldbRemoveFromGlobal(db dbReader, batch dbWriter, repo, node, file []byte) {
  316. if debug {
  317. l.Debugf("remove from global; repo=%q node=%v file=%q", repo, protocol.NodeIDFromBytes(node), file)
  318. }
  319. gk := globalKey(repo, file)
  320. svl, err := db.Get(gk, nil)
  321. if err != nil {
  322. panic(err)
  323. }
  324. var fl versionList
  325. err = fl.UnmarshalXDR(svl)
  326. if err != nil {
  327. panic(err)
  328. }
  329. for i := range fl.versions {
  330. if bytes.Compare(fl.versions[i].node, node) == 0 {
  331. fl.versions = append(fl.versions[:i], fl.versions[i+1:]...)
  332. break
  333. }
  334. }
  335. if len(fl.versions) == 0 {
  336. batch.Delete(gk)
  337. } else {
  338. batch.Put(gk, fl.MarshalXDR())
  339. }
  340. }
  341. func ldbWithHave(db *leveldb.DB, repo, node []byte, truncate bool, fn fileIterator) {
  342. start := nodeKey(repo, node, nil) // before all repo/node files
  343. limit := nodeKey(repo, node, []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
  344. snap, err := db.GetSnapshot()
  345. if err != nil {
  346. panic(err)
  347. }
  348. defer snap.Release()
  349. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  350. defer dbi.Release()
  351. for dbi.Next() {
  352. f, err := unmarshalTrunc(dbi.Value(), truncate)
  353. if err != nil {
  354. panic(err)
  355. }
  356. if cont := fn(f); !cont {
  357. return
  358. }
  359. }
  360. }
  361. func ldbWithAllRepoTruncated(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol.FileInfoTruncated) bool) {
  362. defer runtime.GC()
  363. start := nodeKey(repo, nil, nil) // before all repo/node files
  364. limit := nodeKey(repo, protocol.LocalNodeID[:], []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
  365. snap, err := db.GetSnapshot()
  366. if err != nil {
  367. panic(err)
  368. }
  369. defer snap.Release()
  370. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  371. defer dbi.Release()
  372. for dbi.Next() {
  373. node := nodeKeyNode(dbi.Key())
  374. var f protocol.FileInfoTruncated
  375. err := f.UnmarshalXDR(dbi.Value())
  376. if err != nil {
  377. panic(err)
  378. }
  379. if cont := fn(node, f); !cont {
  380. return
  381. }
  382. }
  383. }
  384. func ldbGet(db *leveldb.DB, repo, node, file []byte) protocol.FileInfo {
  385. nk := nodeKey(repo, node, file)
  386. bs, err := db.Get(nk, nil)
  387. if err == leveldb.ErrNotFound {
  388. return protocol.FileInfo{}
  389. }
  390. if err != nil {
  391. panic(err)
  392. }
  393. var f protocol.FileInfo
  394. err = f.UnmarshalXDR(bs)
  395. if err != nil {
  396. panic(err)
  397. }
  398. return f
  399. }
  400. func ldbGetGlobal(db *leveldb.DB, repo, file []byte) protocol.FileInfo {
  401. k := globalKey(repo, file)
  402. snap, err := db.GetSnapshot()
  403. if err != nil {
  404. panic(err)
  405. }
  406. defer snap.Release()
  407. bs, err := snap.Get(k, nil)
  408. if err == leveldb.ErrNotFound {
  409. return protocol.FileInfo{}
  410. }
  411. if err != nil {
  412. panic(err)
  413. }
  414. var vl versionList
  415. err = vl.UnmarshalXDR(bs)
  416. if err != nil {
  417. panic(err)
  418. }
  419. if len(vl.versions) == 0 {
  420. l.Debugln(k)
  421. panic("no versions?")
  422. }
  423. k = nodeKey(repo, vl.versions[0].node, file)
  424. bs, err = snap.Get(k, nil)
  425. if err != nil {
  426. panic(err)
  427. }
  428. var f protocol.FileInfo
  429. err = f.UnmarshalXDR(bs)
  430. if err != nil {
  431. panic(err)
  432. }
  433. return f
  434. }
  435. func ldbWithGlobal(db *leveldb.DB, repo []byte, truncate bool, fn fileIterator) {
  436. defer runtime.GC()
  437. start := globalKey(repo, nil)
  438. limit := globalKey(repo, []byte{0xff, 0xff, 0xff, 0xff})
  439. snap, err := db.GetSnapshot()
  440. if err != nil {
  441. panic(err)
  442. }
  443. defer snap.Release()
  444. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  445. defer dbi.Release()
  446. for dbi.Next() {
  447. var vl versionList
  448. err := vl.UnmarshalXDR(dbi.Value())
  449. if err != nil {
  450. panic(err)
  451. }
  452. if len(vl.versions) == 0 {
  453. l.Debugln(dbi.Key())
  454. panic("no versions?")
  455. }
  456. fk := nodeKey(repo, vl.versions[0].node, globalKeyName(dbi.Key()))
  457. bs, err := snap.Get(fk, nil)
  458. if err != nil {
  459. panic(err)
  460. }
  461. f, err := unmarshalTrunc(bs, truncate)
  462. if err != nil {
  463. panic(err)
  464. }
  465. if cont := fn(f); !cont {
  466. return
  467. }
  468. }
  469. }
  470. func ldbAvailability(db *leveldb.DB, repo, file []byte) []protocol.NodeID {
  471. k := globalKey(repo, file)
  472. bs, err := db.Get(k, nil)
  473. if err == leveldb.ErrNotFound {
  474. return nil
  475. }
  476. if err != nil {
  477. panic(err)
  478. }
  479. var vl versionList
  480. err = vl.UnmarshalXDR(bs)
  481. if err != nil {
  482. panic(err)
  483. }
  484. var nodes []protocol.NodeID
  485. for _, v := range vl.versions {
  486. if v.version != vl.versions[0].version {
  487. break
  488. }
  489. n := protocol.NodeIDFromBytes(v.node)
  490. nodes = append(nodes, n)
  491. }
  492. return nodes
  493. }
  494. func ldbWithNeed(db *leveldb.DB, repo, node []byte, truncate bool, fn fileIterator) {
  495. defer runtime.GC()
  496. start := globalKey(repo, nil)
  497. limit := globalKey(repo, []byte{0xff, 0xff, 0xff, 0xff})
  498. snap, err := db.GetSnapshot()
  499. if err != nil {
  500. panic(err)
  501. }
  502. defer snap.Release()
  503. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  504. defer dbi.Release()
  505. for dbi.Next() {
  506. var vl versionList
  507. err := vl.UnmarshalXDR(dbi.Value())
  508. if err != nil {
  509. panic(err)
  510. }
  511. if len(vl.versions) == 0 {
  512. l.Debugln(dbi.Key())
  513. panic("no versions?")
  514. }
  515. have := false // If we have the file, any version
  516. need := false // If we have a lower version of the file
  517. var haveVersion uint64
  518. for _, v := range vl.versions {
  519. if bytes.Compare(v.node, node) == 0 {
  520. have = true
  521. haveVersion = v.version
  522. need = v.version < vl.versions[0].version
  523. break
  524. }
  525. }
  526. if need || !have {
  527. name := globalKeyName(dbi.Key())
  528. fk := nodeKey(repo, vl.versions[0].node, name)
  529. bs, err := snap.Get(fk, nil)
  530. if err != nil {
  531. panic(err)
  532. }
  533. gf, err := unmarshalTrunc(bs, truncate)
  534. if err != nil {
  535. panic(err)
  536. }
  537. if gf.IsDeleted() && !have {
  538. // We don't need deleted files that we don't have
  539. continue
  540. }
  541. if debug {
  542. l.Debugf("need repo=%q node=%v name=%q need=%v have=%v haveV=%d globalV=%d", repo, protocol.NodeIDFromBytes(node), name, need, have, haveVersion, vl.versions[0].version)
  543. }
  544. if cont := fn(gf); !cont {
  545. return
  546. }
  547. }
  548. }
  549. }
  550. func ldbListRepos(db *leveldb.DB) []string {
  551. defer runtime.GC()
  552. start := []byte{keyTypeGlobal}
  553. limit := []byte{keyTypeGlobal + 1}
  554. snap, err := db.GetSnapshot()
  555. if err != nil {
  556. panic(err)
  557. }
  558. defer snap.Release()
  559. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  560. defer dbi.Release()
  561. repoExists := make(map[string]bool)
  562. for dbi.Next() {
  563. repo := string(globalKeyRepo(dbi.Key()))
  564. if !repoExists[repo] {
  565. repoExists[repo] = true
  566. }
  567. }
  568. repos := make([]string, 0, len(repoExists))
  569. for k := range repoExists {
  570. repos = append(repos, k)
  571. }
  572. sort.Strings(repos)
  573. return repos
  574. }
  575. func ldbDropRepo(db *leveldb.DB, repo []byte) {
  576. defer runtime.GC()
  577. snap, err := db.GetSnapshot()
  578. if err != nil {
  579. panic(err)
  580. }
  581. defer snap.Release()
  582. // Remove all items related to the given repo from the node->file bucket
  583. start := []byte{keyTypeNode}
  584. limit := []byte{keyTypeNode + 1}
  585. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  586. for dbi.Next() {
  587. itemRepo := nodeKeyRepo(dbi.Key())
  588. if bytes.Compare(repo, itemRepo) == 0 {
  589. db.Delete(dbi.Key(), nil)
  590. }
  591. }
  592. dbi.Release()
  593. // Remove all items related to the given repo from the global bucket
  594. start = []byte{keyTypeGlobal}
  595. limit = []byte{keyTypeGlobal + 1}
  596. dbi = snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  597. for dbi.Next() {
  598. itemRepo := globalKeyRepo(dbi.Key())
  599. if bytes.Compare(repo, itemRepo) == 0 {
  600. db.Delete(dbi.Key(), nil)
  601. }
  602. }
  603. dbi.Release()
  604. }
  605. func unmarshalTrunc(bs []byte, truncate bool) (protocol.FileIntf, error) {
  606. if truncate {
  607. var tf protocol.FileInfoTruncated
  608. err := tf.UnmarshalXDR(bs)
  609. return tf, err
  610. } else {
  611. var tf protocol.FileInfo
  612. err := tf.UnmarshalXDR(bs)
  613. return tf, err
  614. }
  615. }