leveldb.go 24 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. //go:generate -command genxdr go run ../../Godeps/_workspace/src/github.com/calmh/xdr/cmd/genxdr/main.go
  7. //go:generate genxdr -o leveldb_xdr.go leveldb.go
  8. package db
  9. import (
  10. "bytes"
  11. "fmt"
  12. "runtime"
  13. "sort"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. "github.com/syncthing/syncthing/lib/sync"
  16. "github.com/syndtr/goleveldb/leveldb"
  17. "github.com/syndtr/goleveldb/leveldb/iterator"
  18. "github.com/syndtr/goleveldb/leveldb/opt"
  19. "github.com/syndtr/goleveldb/leveldb/util"
  20. )
  21. var (
  22. clockTick int64
  23. clockMut = sync.NewMutex()
  24. )
  25. func clock(v int64) int64 {
  26. clockMut.Lock()
  27. defer clockMut.Unlock()
  28. if v > clockTick {
  29. clockTick = v + 1
  30. } else {
  31. clockTick++
  32. }
  33. return clockTick
  34. }
  35. const (
  36. KeyTypeDevice = iota
  37. KeyTypeGlobal
  38. KeyTypeBlock
  39. KeyTypeDeviceStatistic
  40. KeyTypeFolderStatistic
  41. KeyTypeVirtualMtime
  42. )
  43. type fileVersion struct {
  44. version protocol.Vector
  45. device []byte
  46. }
  47. type versionList struct {
  48. versions []fileVersion
  49. }
  50. func (l versionList) String() string {
  51. var b bytes.Buffer
  52. var id protocol.DeviceID
  53. b.WriteString("{")
  54. for i, v := range l.versions {
  55. if i > 0 {
  56. b.WriteString(", ")
  57. }
  58. copy(id[:], v.device)
  59. fmt.Fprintf(&b, "{%d, %v}", v.version, id)
  60. }
  61. b.WriteString("}")
  62. return b.String()
  63. }
  64. type fileList []protocol.FileInfo
  65. func (l fileList) Len() int {
  66. return len(l)
  67. }
  68. func (l fileList) Swap(a, b int) {
  69. l[a], l[b] = l[b], l[a]
  70. }
  71. func (l fileList) Less(a, b int) bool {
  72. return l[a].Name < l[b].Name
  73. }
  74. type dbReader interface {
  75. Get([]byte, *opt.ReadOptions) ([]byte, error)
  76. }
  77. type dbWriter interface {
  78. Put([]byte, []byte)
  79. Delete([]byte)
  80. }
  81. // Flush batches to disk when they contain this many records.
  82. const batchFlushSize = 64
  83. // deviceKey returns a byte slice encoding the following information:
  84. // keyTypeDevice (1 byte)
  85. // folder (64 bytes)
  86. // device (32 bytes)
  87. // name (variable size)
  88. func deviceKey(folder, device, file []byte) []byte {
  89. return deviceKeyInto(nil, folder, device, file)
  90. }
  91. func deviceKeyInto(k []byte, folder, device, file []byte) []byte {
  92. reqLen := 1 + 64 + 32 + len(file)
  93. if len(k) < reqLen {
  94. k = make([]byte, reqLen)
  95. }
  96. k[0] = KeyTypeDevice
  97. if len(folder) > 64 {
  98. panic("folder name too long")
  99. }
  100. copy(k[1:], []byte(folder))
  101. copy(k[1+64:], device[:])
  102. copy(k[1+64+32:], []byte(file))
  103. return k[:reqLen]
  104. }
  105. func deviceKeyName(key []byte) []byte {
  106. return key[1+64+32:]
  107. }
  108. func deviceKeyFolder(key []byte) []byte {
  109. folder := key[1 : 1+64]
  110. izero := bytes.IndexByte(folder, 0)
  111. if izero < 0 {
  112. return folder
  113. }
  114. return folder[:izero]
  115. }
  116. func deviceKeyDevice(key []byte) []byte {
  117. return key[1+64 : 1+64+32]
  118. }
  119. // globalKey returns a byte slice encoding the following information:
  120. // keyTypeGlobal (1 byte)
  121. // folder (64 bytes)
  122. // name (variable size)
  123. func globalKey(folder, file []byte) []byte {
  124. k := make([]byte, 1+64+len(file))
  125. k[0] = KeyTypeGlobal
  126. if len(folder) > 64 {
  127. panic("folder name too long")
  128. }
  129. copy(k[1:], []byte(folder))
  130. copy(k[1+64:], []byte(file))
  131. return k
  132. }
  133. func globalKeyName(key []byte) []byte {
  134. return key[1+64:]
  135. }
  136. func globalKeyFolder(key []byte) []byte {
  137. folder := key[1 : 1+64]
  138. izero := bytes.IndexByte(folder, 0)
  139. if izero < 0 {
  140. return folder
  141. }
  142. return folder[:izero]
  143. }
  144. type deletionHandler func(db dbReader, batch dbWriter, folder, device, name []byte, dbi iterator.Iterator) int64
  145. func ldbGenericReplace(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo, deleteFn deletionHandler) int64 {
  146. runtime.GC()
  147. sort.Sort(fileList(fs)) // sort list on name, same as in the database
  148. start := deviceKey(folder, device, nil) // before all folder/device files
  149. limit := deviceKey(folder, device, []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files
  150. batch := new(leveldb.Batch)
  151. if debugDB {
  152. l.Debugf("new batch %p", batch)
  153. }
  154. snap, err := db.GetSnapshot()
  155. if err != nil {
  156. panic(err)
  157. }
  158. if debugDB {
  159. l.Debugf("created snapshot %p", snap)
  160. }
  161. defer func() {
  162. if debugDB {
  163. l.Debugf("close snapshot %p", snap)
  164. }
  165. snap.Release()
  166. }()
  167. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  168. defer dbi.Release()
  169. moreDb := dbi.Next()
  170. fsi := 0
  171. var maxLocalVer int64
  172. for {
  173. var newName, oldName []byte
  174. moreFs := fsi < len(fs)
  175. if !moreDb && !moreFs {
  176. break
  177. }
  178. if moreFs {
  179. newName = []byte(fs[fsi].Name)
  180. }
  181. if moreDb {
  182. oldName = deviceKeyName(dbi.Key())
  183. }
  184. cmp := bytes.Compare(newName, oldName)
  185. if debugDB {
  186. l.Debugf("generic replace; folder=%q device=%v moreFs=%v moreDb=%v cmp=%d newName=%q oldName=%q", folder, protocol.DeviceIDFromBytes(device), moreFs, moreDb, cmp, newName, oldName)
  187. }
  188. switch {
  189. case moreFs && (!moreDb || cmp == -1):
  190. if debugDB {
  191. l.Debugln("generic replace; missing - insert")
  192. }
  193. // Database is missing this file. Insert it.
  194. if lv := ldbInsert(batch, folder, device, fs[fsi]); lv > maxLocalVer {
  195. maxLocalVer = lv
  196. }
  197. if fs[fsi].IsInvalid() {
  198. ldbRemoveFromGlobal(snap, batch, folder, device, newName)
  199. } else {
  200. ldbUpdateGlobal(snap, batch, folder, device, fs[fsi])
  201. }
  202. fsi++
  203. case moreFs && moreDb && cmp == 0:
  204. // File exists on both sides - compare versions. We might get an
  205. // update with the same version and different flags if a device has
  206. // marked a file as invalid, so handle that too.
  207. if debugDB {
  208. l.Debugln("generic replace; exists - compare")
  209. }
  210. var ef FileInfoTruncated
  211. ef.UnmarshalXDR(dbi.Value())
  212. if !fs[fsi].Version.Equal(ef.Version) || fs[fsi].Flags != ef.Flags {
  213. if debugDB {
  214. l.Debugln("generic replace; differs - insert")
  215. }
  216. if lv := ldbInsert(batch, folder, device, fs[fsi]); lv > maxLocalVer {
  217. maxLocalVer = lv
  218. }
  219. if fs[fsi].IsInvalid() {
  220. ldbRemoveFromGlobal(snap, batch, folder, device, newName)
  221. } else {
  222. ldbUpdateGlobal(snap, batch, folder, device, fs[fsi])
  223. }
  224. } else if debugDB {
  225. l.Debugln("generic replace; equal - ignore")
  226. }
  227. fsi++
  228. moreDb = dbi.Next()
  229. case moreDb && (!moreFs || cmp == 1):
  230. if debugDB {
  231. l.Debugln("generic replace; exists - remove")
  232. }
  233. if lv := deleteFn(snap, batch, folder, device, oldName, dbi); lv > maxLocalVer {
  234. maxLocalVer = lv
  235. }
  236. moreDb = dbi.Next()
  237. }
  238. // Write out and reuse the batch every few records, to avoid the batch
  239. // growing too large and thus allocating unnecessarily much memory.
  240. if batch.Len() > batchFlushSize {
  241. if debugDB {
  242. l.Debugf("db.Write %p", batch)
  243. }
  244. err = db.Write(batch, nil)
  245. if err != nil {
  246. panic(err)
  247. }
  248. batch.Reset()
  249. }
  250. }
  251. if debugDB {
  252. l.Debugf("db.Write %p", batch)
  253. }
  254. err = db.Write(batch, nil)
  255. if err != nil {
  256. panic(err)
  257. }
  258. return maxLocalVer
  259. }
  260. func ldbReplace(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo) int64 {
  261. // TODO: Return the remaining maxLocalVer?
  262. return ldbGenericReplace(db, folder, device, fs, func(db dbReader, batch dbWriter, folder, device, name []byte, dbi iterator.Iterator) int64 {
  263. // Database has a file that we are missing. Remove it.
  264. if debugDB {
  265. l.Debugf("delete; folder=%q device=%v name=%q", folder, protocol.DeviceIDFromBytes(device), name)
  266. }
  267. ldbRemoveFromGlobal(db, batch, folder, device, name)
  268. if debugDB {
  269. l.Debugf("batch.Delete %p %x", batch, dbi.Key())
  270. }
  271. batch.Delete(dbi.Key())
  272. return 0
  273. })
  274. }
  275. func ldbUpdate(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo) int64 {
  276. runtime.GC()
  277. batch := new(leveldb.Batch)
  278. if debugDB {
  279. l.Debugf("new batch %p", batch)
  280. }
  281. snap, err := db.GetSnapshot()
  282. if err != nil {
  283. panic(err)
  284. }
  285. if debugDB {
  286. l.Debugf("created snapshot %p", snap)
  287. }
  288. defer func() {
  289. if debugDB {
  290. l.Debugf("close snapshot %p", snap)
  291. }
  292. snap.Release()
  293. }()
  294. var maxLocalVer int64
  295. var fk []byte
  296. for _, f := range fs {
  297. name := []byte(f.Name)
  298. fk = deviceKeyInto(fk[:cap(fk)], folder, device, name)
  299. if debugDB {
  300. l.Debugf("snap.Get %p %x", snap, fk)
  301. }
  302. bs, err := snap.Get(fk, nil)
  303. if err == leveldb.ErrNotFound {
  304. if lv := ldbInsert(batch, folder, device, f); lv > maxLocalVer {
  305. maxLocalVer = lv
  306. }
  307. if f.IsInvalid() {
  308. ldbRemoveFromGlobal(snap, batch, folder, device, name)
  309. } else {
  310. ldbUpdateGlobal(snap, batch, folder, device, f)
  311. }
  312. continue
  313. }
  314. var ef FileInfoTruncated
  315. err = ef.UnmarshalXDR(bs)
  316. if err != nil {
  317. panic(err)
  318. }
  319. // Flags might change without the version being bumped when we set the
  320. // invalid flag on an existing file.
  321. if !ef.Version.Equal(f.Version) || ef.Flags != f.Flags {
  322. if lv := ldbInsert(batch, folder, device, f); lv > maxLocalVer {
  323. maxLocalVer = lv
  324. }
  325. if f.IsInvalid() {
  326. ldbRemoveFromGlobal(snap, batch, folder, device, name)
  327. } else {
  328. ldbUpdateGlobal(snap, batch, folder, device, f)
  329. }
  330. }
  331. // Write out and reuse the batch every few records, to avoid the batch
  332. // growing too large and thus allocating unnecessarily much memory.
  333. if batch.Len() > batchFlushSize {
  334. if debugDB {
  335. l.Debugf("db.Write %p", batch)
  336. }
  337. err = db.Write(batch, nil)
  338. if err != nil {
  339. panic(err)
  340. }
  341. batch.Reset()
  342. }
  343. }
  344. if debugDB {
  345. l.Debugf("db.Write %p", batch)
  346. }
  347. err = db.Write(batch, nil)
  348. if err != nil {
  349. panic(err)
  350. }
  351. return maxLocalVer
  352. }
  353. func ldbInsert(batch dbWriter, folder, device []byte, file protocol.FileInfo) int64 {
  354. if debugDB {
  355. l.Debugf("insert; folder=%q device=%v %v", folder, protocol.DeviceIDFromBytes(device), file)
  356. }
  357. if file.LocalVersion == 0 {
  358. file.LocalVersion = clock(0)
  359. }
  360. name := []byte(file.Name)
  361. nk := deviceKey(folder, device, name)
  362. if debugDB {
  363. l.Debugf("batch.Put %p %x", batch, nk)
  364. }
  365. batch.Put(nk, file.MustMarshalXDR())
  366. return file.LocalVersion
  367. }
  368. // ldbUpdateGlobal adds this device+version to the version list for the given
  369. // file. If the device is already present in the list, the version is updated.
  370. // If the file does not have an entry in the global list, it is created.
  371. func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device []byte, file protocol.FileInfo) bool {
  372. if debugDB {
  373. l.Debugf("update global; folder=%q device=%v file=%q version=%d", folder, protocol.DeviceIDFromBytes(device), file.Name, file.Version)
  374. }
  375. name := []byte(file.Name)
  376. gk := globalKey(folder, name)
  377. svl, err := db.Get(gk, nil)
  378. if err != nil && err != leveldb.ErrNotFound {
  379. panic(err)
  380. }
  381. var fl versionList
  382. // Remove the device from the current version list
  383. if svl != nil {
  384. err = fl.UnmarshalXDR(svl)
  385. if err != nil {
  386. panic(err)
  387. }
  388. for i := range fl.versions {
  389. if bytes.Compare(fl.versions[i].device, device) == 0 {
  390. if fl.versions[i].version.Equal(file.Version) {
  391. // No need to do anything
  392. return false
  393. }
  394. fl.versions = append(fl.versions[:i], fl.versions[i+1:]...)
  395. break
  396. }
  397. }
  398. }
  399. nv := fileVersion{
  400. device: device,
  401. version: file.Version,
  402. }
  403. // Find a position in the list to insert this file. The file at the front
  404. // of the list is the newer, the "global".
  405. for i := range fl.versions {
  406. switch fl.versions[i].version.Compare(file.Version) {
  407. case protocol.Equal, protocol.Lesser:
  408. // The version at this point in the list is equal to or lesser
  409. // ("older") than us. We insert ourselves in front of it.
  410. fl.versions = insertVersion(fl.versions, i, nv)
  411. goto done
  412. case protocol.ConcurrentLesser, protocol.ConcurrentGreater:
  413. // The version at this point is in conflict with us. We must pull
  414. // the actual file metadata to determine who wins. If we win, we
  415. // insert ourselves in front of the loser here. (The "Lesser" and
  416. // "Greater" in the condition above is just based on the device
  417. // IDs in the version vector, which is not the only thing we use
  418. // to determine the winner.)
  419. of, ok := ldbGet(db, folder, fl.versions[i].device, name)
  420. if !ok {
  421. panic("file referenced in version list does not exist")
  422. }
  423. if file.WinsConflict(of) {
  424. fl.versions = insertVersion(fl.versions, i, nv)
  425. goto done
  426. }
  427. }
  428. }
  429. // We didn't find a position for an insert above, so append to the end.
  430. fl.versions = append(fl.versions, nv)
  431. done:
  432. if debugDB {
  433. l.Debugf("batch.Put %p %x", batch, gk)
  434. l.Debugf("new global after update: %v", fl)
  435. }
  436. batch.Put(gk, fl.MustMarshalXDR())
  437. return true
  438. }
  439. func insertVersion(vl []fileVersion, i int, v fileVersion) []fileVersion {
  440. t := append(vl, fileVersion{})
  441. copy(t[i+1:], t[i:])
  442. t[i] = v
  443. return t
  444. }
  445. // ldbRemoveFromGlobal removes the device from the global version list for the
  446. // given file. If the version list is empty after this, the file entry is
  447. // removed entirely.
  448. func ldbRemoveFromGlobal(db dbReader, batch dbWriter, folder, device, file []byte) {
  449. if debugDB {
  450. l.Debugf("remove from global; folder=%q device=%v file=%q", folder, protocol.DeviceIDFromBytes(device), file)
  451. }
  452. gk := globalKey(folder, file)
  453. svl, err := db.Get(gk, nil)
  454. if err != nil {
  455. // We might be called to "remove" a global version that doesn't exist
  456. // if the first update for the file is already marked invalid.
  457. return
  458. }
  459. var fl versionList
  460. err = fl.UnmarshalXDR(svl)
  461. if err != nil {
  462. panic(err)
  463. }
  464. for i := range fl.versions {
  465. if bytes.Compare(fl.versions[i].device, device) == 0 {
  466. fl.versions = append(fl.versions[:i], fl.versions[i+1:]...)
  467. break
  468. }
  469. }
  470. if len(fl.versions) == 0 {
  471. if debugDB {
  472. l.Debugf("batch.Delete %p %x", batch, gk)
  473. }
  474. batch.Delete(gk)
  475. } else {
  476. if debugDB {
  477. l.Debugf("batch.Put %p %x", batch, gk)
  478. l.Debugf("new global after remove: %v", fl)
  479. }
  480. batch.Put(gk, fl.MustMarshalXDR())
  481. }
  482. }
  483. func ldbWithHave(db *leveldb.DB, folder, device []byte, truncate bool, fn Iterator) {
  484. start := deviceKey(folder, device, nil) // before all folder/device files
  485. limit := deviceKey(folder, device, []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files
  486. snap, err := db.GetSnapshot()
  487. if err != nil {
  488. panic(err)
  489. }
  490. if debugDB {
  491. l.Debugf("created snapshot %p", snap)
  492. }
  493. defer func() {
  494. if debugDB {
  495. l.Debugf("close snapshot %p", snap)
  496. }
  497. snap.Release()
  498. }()
  499. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  500. defer dbi.Release()
  501. for dbi.Next() {
  502. f, err := unmarshalTrunc(dbi.Value(), truncate)
  503. if err != nil {
  504. panic(err)
  505. }
  506. if cont := fn(f); !cont {
  507. return
  508. }
  509. }
  510. }
  511. func ldbWithAllFolderTruncated(db *leveldb.DB, folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
  512. runtime.GC()
  513. start := deviceKey(folder, nil, nil) // before all folder/device files
  514. limit := deviceKey(folder, protocol.LocalDeviceID[:], []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files
  515. snap, err := db.GetSnapshot()
  516. if err != nil {
  517. panic(err)
  518. }
  519. if debugDB {
  520. l.Debugf("created snapshot %p", snap)
  521. }
  522. defer func() {
  523. if debugDB {
  524. l.Debugf("close snapshot %p", snap)
  525. }
  526. snap.Release()
  527. }()
  528. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  529. defer dbi.Release()
  530. for dbi.Next() {
  531. device := deviceKeyDevice(dbi.Key())
  532. var f FileInfoTruncated
  533. err := f.UnmarshalXDR(dbi.Value())
  534. if err != nil {
  535. panic(err)
  536. }
  537. switch f.Name {
  538. case "", ".", "..", "/": // A few obviously invalid filenames
  539. l.Infof("Dropping invalid filename %q from database", f.Name)
  540. batch := new(leveldb.Batch)
  541. ldbRemoveFromGlobal(db, batch, folder, device, nil)
  542. batch.Delete(dbi.Key())
  543. db.Write(batch, nil)
  544. continue
  545. }
  546. if cont := fn(device, f); !cont {
  547. return
  548. }
  549. }
  550. }
  551. func ldbGet(db dbReader, folder, device, file []byte) (protocol.FileInfo, bool) {
  552. nk := deviceKey(folder, device, file)
  553. bs, err := db.Get(nk, nil)
  554. if err == leveldb.ErrNotFound {
  555. return protocol.FileInfo{}, false
  556. }
  557. if err != nil {
  558. panic(err)
  559. }
  560. var f protocol.FileInfo
  561. err = f.UnmarshalXDR(bs)
  562. if err != nil {
  563. panic(err)
  564. }
  565. return f, true
  566. }
  567. func ldbGetGlobal(db *leveldb.DB, folder, file []byte, truncate bool) (FileIntf, bool) {
  568. k := globalKey(folder, file)
  569. snap, err := db.GetSnapshot()
  570. if err != nil {
  571. panic(err)
  572. }
  573. if debugDB {
  574. l.Debugf("created snapshot %p", snap)
  575. }
  576. defer func() {
  577. if debugDB {
  578. l.Debugf("close snapshot %p", snap)
  579. }
  580. snap.Release()
  581. }()
  582. if debugDB {
  583. l.Debugf("snap.Get %p %x", snap, k)
  584. }
  585. bs, err := snap.Get(k, nil)
  586. if err == leveldb.ErrNotFound {
  587. return nil, false
  588. }
  589. if err != nil {
  590. panic(err)
  591. }
  592. var vl versionList
  593. err = vl.UnmarshalXDR(bs)
  594. if err != nil {
  595. panic(err)
  596. }
  597. if len(vl.versions) == 0 {
  598. l.Debugln(k)
  599. panic("no versions?")
  600. }
  601. k = deviceKey(folder, vl.versions[0].device, file)
  602. if debugDB {
  603. l.Debugf("snap.Get %p %x", snap, k)
  604. }
  605. bs, err = snap.Get(k, nil)
  606. if err != nil {
  607. panic(err)
  608. }
  609. fi, err := unmarshalTrunc(bs, truncate)
  610. if err != nil {
  611. panic(err)
  612. }
  613. return fi, true
  614. }
  615. func ldbWithGlobal(db *leveldb.DB, folder, prefix []byte, truncate bool, fn Iterator) {
  616. runtime.GC()
  617. snap, err := db.GetSnapshot()
  618. if err != nil {
  619. panic(err)
  620. }
  621. if debugDB {
  622. l.Debugf("created snapshot %p", snap)
  623. }
  624. defer func() {
  625. if debugDB {
  626. l.Debugf("close snapshot %p", snap)
  627. }
  628. snap.Release()
  629. }()
  630. dbi := snap.NewIterator(util.BytesPrefix(globalKey(folder, prefix)), nil)
  631. defer dbi.Release()
  632. var fk []byte
  633. for dbi.Next() {
  634. var vl versionList
  635. err := vl.UnmarshalXDR(dbi.Value())
  636. if err != nil {
  637. panic(err)
  638. }
  639. if len(vl.versions) == 0 {
  640. l.Debugln(dbi.Key())
  641. panic("no versions?")
  642. }
  643. name := globalKeyName(dbi.Key())
  644. fk = deviceKeyInto(fk[:cap(fk)], folder, vl.versions[0].device, name)
  645. if debugDB {
  646. l.Debugf("snap.Get %p %x", snap, fk)
  647. }
  648. bs, err := snap.Get(fk, nil)
  649. if err != nil {
  650. l.Debugf("folder: %q (%x)", folder, folder)
  651. l.Debugf("key: %q (%x)", dbi.Key(), dbi.Key())
  652. l.Debugf("vl: %v", vl)
  653. l.Debugf("vl.versions[0].device: %x", vl.versions[0].device)
  654. l.Debugf("name: %q (%x)", name, name)
  655. l.Debugf("fk: %q", fk)
  656. l.Debugf("fk: %x %x %x", fk[1:1+64], fk[1+64:1+64+32], fk[1+64+32:])
  657. panic(err)
  658. }
  659. f, err := unmarshalTrunc(bs, truncate)
  660. if err != nil {
  661. panic(err)
  662. }
  663. if cont := fn(f); !cont {
  664. return
  665. }
  666. }
  667. }
  668. func ldbAvailability(db *leveldb.DB, folder, file []byte) []protocol.DeviceID {
  669. k := globalKey(folder, file)
  670. bs, err := db.Get(k, nil)
  671. if err == leveldb.ErrNotFound {
  672. return nil
  673. }
  674. if err != nil {
  675. panic(err)
  676. }
  677. var vl versionList
  678. err = vl.UnmarshalXDR(bs)
  679. if err != nil {
  680. panic(err)
  681. }
  682. var devices []protocol.DeviceID
  683. for _, v := range vl.versions {
  684. if !v.version.Equal(vl.versions[0].version) {
  685. break
  686. }
  687. n := protocol.DeviceIDFromBytes(v.device)
  688. devices = append(devices, n)
  689. }
  690. return devices
  691. }
  692. func ldbWithNeed(db *leveldb.DB, folder, device []byte, truncate bool, fn Iterator) {
  693. runtime.GC()
  694. start := globalKey(folder, nil)
  695. limit := globalKey(folder, []byte{0xff, 0xff, 0xff, 0xff})
  696. snap, err := db.GetSnapshot()
  697. if err != nil {
  698. panic(err)
  699. }
  700. if debugDB {
  701. l.Debugf("created snapshot %p", snap)
  702. }
  703. defer func() {
  704. if debugDB {
  705. l.Debugf("close snapshot %p", snap)
  706. }
  707. snap.Release()
  708. }()
  709. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  710. defer dbi.Release()
  711. var fk []byte
  712. nextFile:
  713. for dbi.Next() {
  714. var vl versionList
  715. err := vl.UnmarshalXDR(dbi.Value())
  716. if err != nil {
  717. panic(err)
  718. }
  719. if len(vl.versions) == 0 {
  720. l.Debugln(dbi.Key())
  721. panic("no versions?")
  722. }
  723. have := false // If we have the file, any version
  724. need := false // If we have a lower version of the file
  725. var haveVersion protocol.Vector
  726. for _, v := range vl.versions {
  727. if bytes.Compare(v.device, device) == 0 {
  728. have = true
  729. haveVersion = v.version
  730. // XXX: This marks Concurrent (i.e. conflicting) changes as
  731. // needs. Maybe we should do that, but it needs special
  732. // handling in the puller.
  733. need = !v.version.GreaterEqual(vl.versions[0].version)
  734. break
  735. }
  736. }
  737. if need || !have {
  738. name := globalKeyName(dbi.Key())
  739. needVersion := vl.versions[0].version
  740. nextVersion:
  741. for i := range vl.versions {
  742. if !vl.versions[i].version.Equal(needVersion) {
  743. // We haven't found a valid copy of the file with the needed version.
  744. continue nextFile
  745. }
  746. fk = deviceKeyInto(fk[:cap(fk)], folder, vl.versions[i].device, name)
  747. if debugDB {
  748. l.Debugf("snap.Get %p %x", snap, fk)
  749. }
  750. bs, err := snap.Get(fk, nil)
  751. if err != nil {
  752. var id protocol.DeviceID
  753. copy(id[:], device)
  754. l.Debugf("device: %v", id)
  755. l.Debugf("need: %v, have: %v", need, have)
  756. l.Debugf("key: %q (%x)", dbi.Key(), dbi.Key())
  757. l.Debugf("vl: %v", vl)
  758. l.Debugf("i: %v", i)
  759. l.Debugf("fk: %q (%x)", fk, fk)
  760. l.Debugf("name: %q (%x)", name, name)
  761. panic(err)
  762. }
  763. gf, err := unmarshalTrunc(bs, truncate)
  764. if err != nil {
  765. panic(err)
  766. }
  767. if gf.IsInvalid() {
  768. // The file is marked invalid for whatever reason, don't use it.
  769. continue nextVersion
  770. }
  771. if gf.IsDeleted() && !have {
  772. // We don't need deleted files that we don't have
  773. continue nextFile
  774. }
  775. if debugDB {
  776. l.Debugf("need folder=%q device=%v name=%q need=%v have=%v haveV=%d globalV=%d", folder, protocol.DeviceIDFromBytes(device), name, need, have, haveVersion, vl.versions[0].version)
  777. }
  778. if cont := fn(gf); !cont {
  779. return
  780. }
  781. // This file is handled, no need to look further in the version list
  782. continue nextFile
  783. }
  784. }
  785. }
  786. }
  787. func ldbListFolders(db *leveldb.DB) []string {
  788. runtime.GC()
  789. snap, err := db.GetSnapshot()
  790. if err != nil {
  791. panic(err)
  792. }
  793. if debugDB {
  794. l.Debugf("created snapshot %p", snap)
  795. }
  796. defer func() {
  797. if debugDB {
  798. l.Debugf("close snapshot %p", snap)
  799. }
  800. snap.Release()
  801. }()
  802. dbi := snap.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
  803. defer dbi.Release()
  804. folderExists := make(map[string]bool)
  805. for dbi.Next() {
  806. folder := string(globalKeyFolder(dbi.Key()))
  807. if !folderExists[folder] {
  808. folderExists[folder] = true
  809. }
  810. }
  811. folders := make([]string, 0, len(folderExists))
  812. for k := range folderExists {
  813. folders = append(folders, k)
  814. }
  815. sort.Strings(folders)
  816. return folders
  817. }
  818. func ldbDropFolder(db *leveldb.DB, folder []byte) {
  819. runtime.GC()
  820. snap, err := db.GetSnapshot()
  821. if err != nil {
  822. panic(err)
  823. }
  824. if debugDB {
  825. l.Debugf("created snapshot %p", snap)
  826. }
  827. defer func() {
  828. if debugDB {
  829. l.Debugf("close snapshot %p", snap)
  830. }
  831. snap.Release()
  832. }()
  833. // Remove all items related to the given folder from the device->file bucket
  834. dbi := snap.NewIterator(util.BytesPrefix([]byte{KeyTypeDevice}), nil)
  835. for dbi.Next() {
  836. itemFolder := deviceKeyFolder(dbi.Key())
  837. if bytes.Compare(folder, itemFolder) == 0 {
  838. db.Delete(dbi.Key(), nil)
  839. }
  840. }
  841. dbi.Release()
  842. // Remove all items related to the given folder from the global bucket
  843. dbi = snap.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
  844. for dbi.Next() {
  845. itemFolder := globalKeyFolder(dbi.Key())
  846. if bytes.Compare(folder, itemFolder) == 0 {
  847. db.Delete(dbi.Key(), nil)
  848. }
  849. }
  850. dbi.Release()
  851. }
  852. func unmarshalTrunc(bs []byte, truncate bool) (FileIntf, error) {
  853. if truncate {
  854. var tf FileInfoTruncated
  855. err := tf.UnmarshalXDR(bs)
  856. return tf, err
  857. }
  858. var tf protocol.FileInfo
  859. err := tf.UnmarshalXDR(bs)
  860. return tf, err
  861. }
  862. func ldbCheckGlobals(db *leveldb.DB, folder []byte) {
  863. defer runtime.GC()
  864. snap, err := db.GetSnapshot()
  865. if err != nil {
  866. panic(err)
  867. }
  868. if debugDB {
  869. l.Debugf("created snapshot %p", snap)
  870. }
  871. defer func() {
  872. if debugDB {
  873. l.Debugf("close snapshot %p", snap)
  874. }
  875. snap.Release()
  876. }()
  877. start := globalKey(folder, nil)
  878. limit := globalKey(folder, []byte{0xff, 0xff, 0xff, 0xff})
  879. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  880. defer dbi.Release()
  881. batch := new(leveldb.Batch)
  882. if debugDB {
  883. l.Debugf("new batch %p", batch)
  884. }
  885. var fk []byte
  886. for dbi.Next() {
  887. gk := dbi.Key()
  888. var vl versionList
  889. err := vl.UnmarshalXDR(dbi.Value())
  890. if err != nil {
  891. panic(err)
  892. }
  893. // Check the global version list for consistency. An issue in previous
  894. // versions of goleveldb could result in reordered writes so that
  895. // there are global entries pointing to no longer existing files. Here
  896. // we find those and clear them out.
  897. name := globalKeyName(gk)
  898. var newVL versionList
  899. for _, version := range vl.versions {
  900. fk = deviceKeyInto(fk[:cap(fk)], folder, version.device, name)
  901. if debugDB {
  902. l.Debugf("snap.Get %p %x", snap, fk)
  903. }
  904. _, err := snap.Get(fk, nil)
  905. if err == leveldb.ErrNotFound {
  906. continue
  907. }
  908. if err != nil {
  909. panic(err)
  910. }
  911. newVL.versions = append(newVL.versions, version)
  912. }
  913. if len(newVL.versions) != len(vl.versions) {
  914. l.Infof("db repair: rewriting global version list for %x %x", gk[1:1+64], gk[1+64:])
  915. batch.Put(dbi.Key(), newVL.MustMarshalXDR())
  916. }
  917. }
  918. if debugDB {
  919. l.Infoln("db check completed for %q", folder)
  920. }
  921. db.Write(batch, nil)
  922. }