leveldb.go 15 KB


  1. package files
  2. import (
  3. "bytes"
  4. "sort"
  5. "sync"
  6. "github.com/calmh/syncthing/lamport"
  7. "github.com/calmh/syncthing/protocol"
  8. "github.com/syndtr/goleveldb/leveldb"
  9. "github.com/syndtr/goleveldb/leveldb/iterator"
  10. "github.com/syndtr/goleveldb/leveldb/opt"
  11. "github.com/syndtr/goleveldb/leveldb/util"
  12. )
  13. var (
  14. clockTick uint64
  15. clockMut sync.Mutex
  16. )
  17. func clock(v uint64) uint64 {
  18. clockMut.Lock()
  19. defer clockMut.Unlock()
  20. if v > clockTick {
  21. clockTick = v + 1
  22. } else {
  23. clockTick++
  24. }
  25. return clockTick
  26. }
  27. const (
  28. keyTypeNode = iota
  29. keyTypeGlobal
  30. )
  31. type fileVersion struct {
  32. version uint64
  33. node []byte
  34. }
  35. type versionList struct {
  36. versions []fileVersion
  37. }
  38. type fileList []protocol.FileInfo
  39. func (l fileList) Len() int {
  40. return len(l)
  41. }
  42. func (l fileList) Swap(a, b int) {
  43. l[a], l[b] = l[b], l[a]
  44. }
  45. func (l fileList) Less(a, b int) bool {
  46. return l[a].Name < l[b].Name
  47. }
  48. type dbReader interface {
  49. Get([]byte, *opt.ReadOptions) ([]byte, error)
  50. }
  51. type dbWriter interface {
  52. Put([]byte, []byte)
  53. Delete([]byte)
  54. }
  55. /*
  56. keyTypeNode (1 byte)
  57. repository (64 bytes)
  58. node (32 bytes)
  59. name (variable size)
  60. |
  61. scanner.File
  62. keyTypeGlobal (1 byte)
  63. repository (64 bytes)
  64. name (variable size)
  65. |
  66. []fileVersion (sorted)
  67. */
  68. func nodeKey(repo, node, file []byte) []byte {
  69. k := make([]byte, 1+64+32+len(file))
  70. k[0] = keyTypeNode
  71. copy(k[1:], []byte(repo))
  72. copy(k[1+64:], node[:])
  73. copy(k[1+64+32:], []byte(file))
  74. return k
  75. }
  76. func globalKey(repo, file []byte) []byte {
  77. k := make([]byte, 1+64+len(file))
  78. k[0] = keyTypeGlobal
  79. copy(k[1:], []byte(repo))
  80. copy(k[1+64:], []byte(file))
  81. return k
  82. }
  83. func nodeKeyName(key []byte) []byte {
  84. return key[1+64+32:]
  85. }
  86. func nodeKeyRepo(key []byte) []byte {
  87. repo := key[1 : 1+64]
  88. izero := bytes.IndexByte(repo, 0)
  89. return repo[:izero]
  90. }
  91. func nodeKeyNode(key []byte) []byte {
  92. return key[1+64 : 1+64+32]
  93. }
  94. func globalKeyName(key []byte) []byte {
  95. return key[1+64:]
  96. }
  97. type deletionHandler func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64
  98. type fileIterator func(f protocol.FileInfo) bool
  99. func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo, deleteFn deletionHandler) uint64 {
  100. sort.Sort(fileList(fs)) // sort list on name, same as on disk
  101. start := nodeKey(repo, node, nil) // before all repo/node files
  102. limit := nodeKey(repo, node, []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
  103. batch := new(leveldb.Batch)
  104. snap, err := db.GetSnapshot()
  105. if err != nil {
  106. panic(err)
  107. }
  108. defer snap.Release()
  109. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  110. defer dbi.Release()
  111. moreDb := dbi.Next()
  112. fsi := 0
  113. var maxLocalVer uint64
  114. for {
  115. var newName, oldName []byte
  116. moreFs := fsi < len(fs)
  117. if !moreDb && !moreFs {
  118. break
  119. }
  120. if !moreFs && deleteFn == nil {
  121. // We don't have any more updated files to process and deletion
  122. // has not been requested, so we can exit early
  123. break
  124. }
  125. if moreFs {
  126. newName = []byte(fs[fsi].Name)
  127. }
  128. if moreDb {
  129. oldName = nodeKeyName(dbi.Key())
  130. }
  131. cmp := bytes.Compare(newName, oldName)
  132. if debug {
  133. l.Debugf("generic replace; repo=%q node=%v moreFs=%v moreDb=%v cmp=%d newName=%q oldName=%q", repo, protocol.NodeIDFromBytes(node), moreFs, moreDb, cmp, newName, oldName)
  134. }
  135. switch {
  136. case moreFs && (!moreDb || cmp == -1):
  137. // Disk is missing this file. Insert it.
  138. if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer {
  139. maxLocalVer = lv
  140. }
  141. ldbUpdateGlobal(snap, batch, repo, node, newName, fs[fsi].Version)
  142. fsi++
  143. case moreFs && moreDb && cmp == 0:
  144. // File exists on both sides - compare versions.
  145. var ef protocol.FileInfo
  146. ef.UnmarshalXDR(dbi.Value())
  147. if fs[fsi].Version > ef.Version {
  148. if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer {
  149. maxLocalVer = lv
  150. }
  151. ldbUpdateGlobal(snap, batch, repo, node, newName, fs[fsi].Version)
  152. }
  153. // Iterate both sides.
  154. fsi++
  155. moreDb = dbi.Next()
  156. case moreDb && (!moreFs || cmp == 1):
  157. if deleteFn != nil {
  158. if lv := deleteFn(snap, batch, repo, node, oldName, dbi); lv > maxLocalVer {
  159. maxLocalVer = lv
  160. }
  161. }
  162. moreDb = dbi.Next()
  163. }
  164. }
  165. err = db.Write(batch, nil)
  166. if err != nil {
  167. panic(err)
  168. }
  169. return maxLocalVer
  170. }
  171. func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
  172. // TODO: Return the remaining maxLocalVer?
  173. return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 {
  174. // Disk has files that we are missing. Remove it.
  175. if debug {
  176. l.Debugf("delete; repo=%q node=%v name=%q", repo, protocol.NodeIDFromBytes(node), name)
  177. }
  178. ldbRemoveFromGlobal(db, batch, repo, node, name)
  179. batch.Delete(dbi.Key())
  180. return 0
  181. })
  182. }
  183. func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
  184. return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 {
  185. var f protocol.FileInfo
  186. err := f.UnmarshalXDR(dbi.Value())
  187. if err != nil {
  188. panic(err)
  189. }
  190. if !protocol.IsDeleted(f.Flags) {
  191. if debug {
  192. l.Debugf("mark deleted; repo=%q node=%v name=%q", repo, protocol.NodeIDFromBytes(node), name)
  193. }
  194. ts := clock(f.LocalVersion)
  195. f.Blocks = nil
  196. f.Version = lamport.Default.Tick(f.Version)
  197. f.Flags |= protocol.FlagDeleted
  198. f.LocalVersion = ts
  199. batch.Put(dbi.Key(), f.MarshalXDR())
  200. ldbUpdateGlobal(db, batch, repo, node, nodeKeyName(dbi.Key()), f.Version)
  201. return ts
  202. }
  203. return 0
  204. })
  205. }
  206. func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
  207. batch := new(leveldb.Batch)
  208. snap, err := db.GetSnapshot()
  209. if err != nil {
  210. panic(err)
  211. }
  212. defer snap.Release()
  213. var maxLocalVer uint64
  214. for _, f := range fs {
  215. name := []byte(f.Name)
  216. fk := nodeKey(repo, node, name)
  217. bs, err := snap.Get(fk, nil)
  218. if err == leveldb.ErrNotFound {
  219. if lv := ldbInsert(batch, repo, node, name, f); lv > maxLocalVer {
  220. maxLocalVer = lv
  221. }
  222. ldbUpdateGlobal(snap, batch, repo, node, name, f.Version)
  223. continue
  224. }
  225. var ef protocol.FileInfo
  226. err = ef.UnmarshalXDR(bs)
  227. if err != nil {
  228. panic(err)
  229. }
  230. if ef.Version != f.Version {
  231. if lv := ldbInsert(batch, repo, node, name, f); lv > maxLocalVer {
  232. maxLocalVer = lv
  233. }
  234. ldbUpdateGlobal(snap, batch, repo, node, name, f.Version)
  235. }
  236. }
  237. err = db.Write(batch, nil)
  238. if err != nil {
  239. panic(err)
  240. }
  241. return maxLocalVer
  242. }
  243. func ldbInsert(batch dbWriter, repo, node, name []byte, file protocol.FileInfo) uint64 {
  244. if debug {
  245. l.Debugf("insert; repo=%q node=%v %v", repo, protocol.NodeIDFromBytes(node), file)
  246. }
  247. if file.LocalVersion == 0 {
  248. file.LocalVersion = clock(0)
  249. }
  250. nk := nodeKey(repo, node, name)
  251. batch.Put(nk, file.MarshalXDR())
  252. return file.LocalVersion
  253. }
  254. // ldbUpdateGlobal adds this node+version to the version list for the given
  255. // file. If the node is already present in the list, the version is updated.
  256. // If the file does not have an entry in the global list, it is created.
  257. func ldbUpdateGlobal(db dbReader, batch dbWriter, repo, node, file []byte, version uint64) bool {
  258. if debug {
  259. l.Debugf("update global; repo=%q node=%v file=%q version=%d", repo, protocol.NodeIDFromBytes(node), file, version)
  260. }
  261. gk := globalKey(repo, file)
  262. svl, err := db.Get(gk, nil)
  263. if err != nil && err != leveldb.ErrNotFound {
  264. panic(err)
  265. }
  266. var fl versionList
  267. nv := fileVersion{
  268. node: node,
  269. version: version,
  270. }
  271. if svl != nil {
  272. err = fl.UnmarshalXDR(svl)
  273. if err != nil {
  274. panic(err)
  275. }
  276. for i := range fl.versions {
  277. if bytes.Compare(fl.versions[i].node, node) == 0 {
  278. if fl.versions[i].version == version {
  279. // No need to do anything
  280. return false
  281. }
  282. fl.versions = append(fl.versions[:i], fl.versions[i+1:]...)
  283. break
  284. }
  285. }
  286. }
  287. for i := range fl.versions {
  288. if fl.versions[i].version <= version {
  289. t := append(fl.versions, fileVersion{})
  290. copy(t[i+1:], t[i:])
  291. t[i] = nv
  292. fl.versions = t
  293. goto done
  294. }
  295. }
  296. fl.versions = append(fl.versions, nv)
  297. done:
  298. batch.Put(gk, fl.MarshalXDR())
  299. return true
  300. }
  301. // ldbRemoveFromGlobal removes the node from the global version list for the
  302. // given file. If the version list is empty after this, the file entry is
  303. // removed entirely.
  304. func ldbRemoveFromGlobal(db dbReader, batch dbWriter, repo, node, file []byte) {
  305. if debug {
  306. l.Debugf("remove from global; repo=%q node=%v file=%q", repo, protocol.NodeIDFromBytes(node), file)
  307. }
  308. gk := globalKey(repo, file)
  309. svl, err := db.Get(gk, nil)
  310. if err != nil {
  311. panic(err)
  312. }
  313. var fl versionList
  314. err = fl.UnmarshalXDR(svl)
  315. if err != nil {
  316. panic(err)
  317. }
  318. for i := range fl.versions {
  319. if bytes.Compare(fl.versions[i].node, node) == 0 {
  320. fl.versions = append(fl.versions[:i], fl.versions[i+1:]...)
  321. break
  322. }
  323. }
  324. if len(fl.versions) == 0 {
  325. batch.Delete(gk)
  326. } else {
  327. batch.Put(gk, fl.MarshalXDR())
  328. }
  329. }
  330. func ldbWithHave(db *leveldb.DB, repo, node []byte, fn fileIterator) {
  331. start := nodeKey(repo, node, nil) // before all repo/node files
  332. limit := nodeKey(repo, node, []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
  333. snap, err := db.GetSnapshot()
  334. if err != nil {
  335. panic(err)
  336. }
  337. defer snap.Release()
  338. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  339. defer dbi.Release()
  340. for dbi.Next() {
  341. var f protocol.FileInfo
  342. err := f.UnmarshalXDR(dbi.Value())
  343. if err != nil {
  344. panic(err)
  345. }
  346. if cont := fn(f); !cont {
  347. return
  348. }
  349. }
  350. }
  351. func ldbWithAllRepo(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol.FileInfo) bool) {
  352. start := nodeKey(repo, nil, nil) // before all repo/node files
  353. limit := nodeKey(repo, protocol.LocalNodeID[:], []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
  354. snap, err := db.GetSnapshot()
  355. if err != nil {
  356. panic(err)
  357. }
  358. defer snap.Release()
  359. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  360. defer dbi.Release()
  361. for dbi.Next() {
  362. node := nodeKeyNode(dbi.Key())
  363. var f protocol.FileInfo
  364. err := f.UnmarshalXDR(dbi.Value())
  365. if err != nil {
  366. panic(err)
  367. }
  368. if cont := fn(node, f); !cont {
  369. return
  370. }
  371. }
  372. }
  373. /*
  374. func ldbCheckGlobalConsistency(db *leveldb.DB, repo []byte) {
  375. l.Debugf("Checking global consistency for %q", repo)
  376. start := nodeKey(repo, nil, nil) // before all repo/node files
  377. limit := nodeKey(repo, protocol.LocalNodeID[:], []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
  378. snap, err := db.GetSnapshot()
  379. if err != nil {
  380. panic(err)
  381. }
  382. defer snap.Release()
  383. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  384. defer dbi.Release()
  385. batch := new(leveldb.Batch)
  386. i := 0
  387. for dbi.Next() {
  388. repo := nodeKeyRepo(dbi.Key())
  389. node := nodeKeyNode(dbi.Key())
  390. var f protocol.FileInfo
  391. err := f.UnmarshalXDR(dbi.Value())
  392. if err != nil {
  393. panic(err)
  394. }
  395. if ldbUpdateGlobal(snap, batch, repo, node, []byte(f.Name), f.Version) {
  396. var nodeID protocol.NodeID
  397. copy(nodeID[:], node)
  398. l.Debugf("fixed global for %q %s %q", repo, nodeID, f.Name)
  399. }
  400. i++
  401. }
  402. l.Debugln("Done", i)
  403. }
  404. */
  405. func ldbGet(db *leveldb.DB, repo, node, file []byte) protocol.FileInfo {
  406. nk := nodeKey(repo, node, file)
  407. bs, err := db.Get(nk, nil)
  408. if err == leveldb.ErrNotFound {
  409. return protocol.FileInfo{}
  410. }
  411. if err != nil {
  412. panic(err)
  413. }
  414. var f protocol.FileInfo
  415. err = f.UnmarshalXDR(bs)
  416. if err != nil {
  417. panic(err)
  418. }
  419. return f
  420. }
  421. func ldbGetGlobal(db *leveldb.DB, repo, file []byte) protocol.FileInfo {
  422. k := globalKey(repo, file)
  423. snap, err := db.GetSnapshot()
  424. if err != nil {
  425. panic(err)
  426. }
  427. defer snap.Release()
  428. bs, err := snap.Get(k, nil)
  429. if err == leveldb.ErrNotFound {
  430. return protocol.FileInfo{}
  431. }
  432. if err != nil {
  433. panic(err)
  434. }
  435. var vl versionList
  436. err = vl.UnmarshalXDR(bs)
  437. if err != nil {
  438. panic(err)
  439. }
  440. if len(vl.versions) == 0 {
  441. l.Debugln(k)
  442. panic("no versions?")
  443. }
  444. k = nodeKey(repo, vl.versions[0].node, file)
  445. bs, err = snap.Get(k, nil)
  446. if err != nil {
  447. panic(err)
  448. }
  449. var f protocol.FileInfo
  450. err = f.UnmarshalXDR(bs)
  451. if err != nil {
  452. panic(err)
  453. }
  454. return f
  455. }
  456. func ldbWithGlobal(db *leveldb.DB, repo []byte, fn fileIterator) {
  457. start := globalKey(repo, nil)
  458. limit := globalKey(repo, []byte{0xff, 0xff, 0xff, 0xff})
  459. snap, err := db.GetSnapshot()
  460. if err != nil {
  461. panic(err)
  462. }
  463. defer snap.Release()
  464. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  465. defer dbi.Release()
  466. for dbi.Next() {
  467. var vl versionList
  468. err := vl.UnmarshalXDR(dbi.Value())
  469. if err != nil {
  470. panic(err)
  471. }
  472. if len(vl.versions) == 0 {
  473. l.Debugln(dbi.Key())
  474. panic("no versions?")
  475. }
  476. fk := nodeKey(repo, vl.versions[0].node, globalKeyName(dbi.Key()))
  477. bs, err := snap.Get(fk, nil)
  478. if err != nil {
  479. panic(err)
  480. }
  481. var f protocol.FileInfo
  482. err = f.UnmarshalXDR(bs)
  483. if err != nil {
  484. panic(err)
  485. }
  486. if cont := fn(f); !cont {
  487. return
  488. }
  489. }
  490. }
  491. func ldbAvailability(db *leveldb.DB, repo, file []byte) []protocol.NodeID {
  492. k := globalKey(repo, file)
  493. bs, err := db.Get(k, nil)
  494. if err == leveldb.ErrNotFound {
  495. return nil
  496. }
  497. if err != nil {
  498. panic(err)
  499. }
  500. var vl versionList
  501. err = vl.UnmarshalXDR(bs)
  502. if err != nil {
  503. panic(err)
  504. }
  505. var nodes []protocol.NodeID
  506. for _, v := range vl.versions {
  507. if v.version != vl.versions[0].version {
  508. break
  509. }
  510. n := protocol.NodeIDFromBytes(v.node)
  511. nodes = append(nodes, n)
  512. }
  513. return nodes
  514. }
  515. func ldbWithNeed(db *leveldb.DB, repo, node []byte, fn fileIterator) {
  516. start := globalKey(repo, nil)
  517. limit := globalKey(repo, []byte{0xff, 0xff, 0xff, 0xff})
  518. snap, err := db.GetSnapshot()
  519. if err != nil {
  520. panic(err)
  521. }
  522. defer snap.Release()
  523. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  524. defer dbi.Release()
  525. for dbi.Next() {
  526. var vl versionList
  527. err := vl.UnmarshalXDR(dbi.Value())
  528. if err != nil {
  529. panic(err)
  530. }
  531. if len(vl.versions) == 0 {
  532. l.Debugln(dbi.Key())
  533. panic("no versions?")
  534. }
  535. have := false // If we have the file, any version
  536. need := false // If we have a lower version of the file
  537. var haveVersion uint64
  538. for _, v := range vl.versions {
  539. if bytes.Compare(v.node, node) == 0 {
  540. have = true
  541. haveVersion = v.version
  542. need = v.version < vl.versions[0].version
  543. break
  544. }
  545. }
  546. if need || !have {
  547. name := globalKeyName(dbi.Key())
  548. fk := nodeKey(repo, vl.versions[0].node, name)
  549. bs, err := snap.Get(fk, nil)
  550. if err != nil {
  551. panic(err)
  552. }
  553. var gf protocol.FileInfo
  554. err = gf.UnmarshalXDR(bs)
  555. if err != nil {
  556. panic(err)
  557. }
  558. if protocol.IsDeleted(gf.Flags) && !have {
  559. // We don't need deleted files that we don't have
  560. continue
  561. }
  562. if debug {
  563. l.Debugf("need repo=%q node=%v name=%q need=%v have=%v haveV=%d globalV=%d", repo, protocol.NodeIDFromBytes(node), name, need, have, haveVersion, vl.versions[0].version)
  564. }
  565. if cont := fn(gf); !cont {
  566. return
  567. }
  568. }
  569. }
  570. }