leveldb_dbinstance.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "sort"
  10. "github.com/syncthing/syncthing/lib/protocol"
  11. "github.com/syndtr/goleveldb/leveldb"
  12. "github.com/syndtr/goleveldb/leveldb/iterator"
  13. "github.com/syndtr/goleveldb/leveldb/util"
  14. )
  15. type deletionHandler func(t readWriteTransaction, folder, device, name []byte, dbi iterator.Iterator) int64
  16. type dbInstance struct {
  17. *leveldb.DB
  18. }
  19. func newDBInstance(db *leveldb.DB) *dbInstance {
  20. return &dbInstance{
  21. DB: db,
  22. }
  23. }
  24. func (db *dbInstance) genericReplace(folder, device []byte, fs []protocol.FileInfo, localSize, globalSize *sizeTracker, deleteFn deletionHandler) int64 {
  25. sort.Sort(fileList(fs)) // sort list on name, same as in the database
  26. start := db.deviceKey(folder, device, nil) // before all folder/device files
  27. limit := db.deviceKey(folder, device, []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files
  28. t := db.newReadWriteTransaction()
  29. defer t.close()
  30. dbi := t.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  31. defer dbi.Release()
  32. moreDb := dbi.Next()
  33. fsi := 0
  34. var maxLocalVer int64
  35. isLocalDevice := bytes.Equal(device, protocol.LocalDeviceID[:])
  36. for {
  37. var newName, oldName []byte
  38. moreFs := fsi < len(fs)
  39. if !moreDb && !moreFs {
  40. break
  41. }
  42. if moreFs {
  43. newName = []byte(fs[fsi].Name)
  44. }
  45. if moreDb {
  46. oldName = db.deviceKeyName(dbi.Key())
  47. }
  48. cmp := bytes.Compare(newName, oldName)
  49. l.Debugf("generic replace; folder=%q device=%v moreFs=%v moreDb=%v cmp=%d newName=%q oldName=%q", folder, protocol.DeviceIDFromBytes(device), moreFs, moreDb, cmp, newName, oldName)
  50. switch {
  51. case moreFs && (!moreDb || cmp == -1):
  52. l.Debugln("generic replace; missing - insert")
  53. // Database is missing this file. Insert it.
  54. if lv := t.insertFile(folder, device, fs[fsi]); lv > maxLocalVer {
  55. maxLocalVer = lv
  56. }
  57. if isLocalDevice {
  58. localSize.addFile(fs[fsi])
  59. }
  60. if fs[fsi].IsInvalid() {
  61. t.removeFromGlobal(folder, device, newName, globalSize)
  62. } else {
  63. t.updateGlobal(folder, device, fs[fsi], globalSize)
  64. }
  65. fsi++
  66. case moreFs && moreDb && cmp == 0:
  67. // File exists on both sides - compare versions. We might get an
  68. // update with the same version and different flags if a device has
  69. // marked a file as invalid, so handle that too.
  70. l.Debugln("generic replace; exists - compare")
  71. var ef FileInfoTruncated
  72. ef.UnmarshalXDR(dbi.Value())
  73. if !fs[fsi].Version.Equal(ef.Version) || fs[fsi].Flags != ef.Flags {
  74. l.Debugln("generic replace; differs - insert")
  75. if lv := t.insertFile(folder, device, fs[fsi]); lv > maxLocalVer {
  76. maxLocalVer = lv
  77. }
  78. if isLocalDevice {
  79. localSize.removeFile(ef)
  80. localSize.addFile(fs[fsi])
  81. }
  82. if fs[fsi].IsInvalid() {
  83. t.removeFromGlobal(folder, device, newName, globalSize)
  84. } else {
  85. t.updateGlobal(folder, device, fs[fsi], globalSize)
  86. }
  87. } else {
  88. l.Debugln("generic replace; equal - ignore")
  89. }
  90. fsi++
  91. moreDb = dbi.Next()
  92. case moreDb && (!moreFs || cmp == 1):
  93. l.Debugln("generic replace; exists - remove")
  94. if lv := deleteFn(t, folder, device, oldName, dbi); lv > maxLocalVer {
  95. maxLocalVer = lv
  96. }
  97. moreDb = dbi.Next()
  98. }
  99. // Write out and reuse the batch every few records, to avoid the batch
  100. // growing too large and thus allocating unnecessarily much memory.
  101. t.checkFlush()
  102. }
  103. return maxLocalVer
  104. }
  105. func (db *dbInstance) replace(folder, device []byte, fs []protocol.FileInfo, localSize, globalSize *sizeTracker) int64 {
  106. // TODO: Return the remaining maxLocalVer?
  107. return db.genericReplace(folder, device, fs, localSize, globalSize, func(t readWriteTransaction, folder, device, name []byte, dbi iterator.Iterator) int64 {
  108. // Database has a file that we are missing. Remove it.
  109. l.Debugf("delete; folder=%q device=%v name=%q", folder, protocol.DeviceIDFromBytes(device), name)
  110. t.removeFromGlobal(folder, device, name, globalSize)
  111. t.Delete(dbi.Key())
  112. return 0
  113. })
  114. }
  115. func (db *dbInstance) updateFiles(folder, device []byte, fs []protocol.FileInfo, localSize, globalSize *sizeTracker) int64 {
  116. t := db.newReadWriteTransaction()
  117. defer t.close()
  118. var maxLocalVer int64
  119. var fk []byte
  120. isLocalDevice := bytes.Equal(device, protocol.LocalDeviceID[:])
  121. for _, f := range fs {
  122. name := []byte(f.Name)
  123. fk = db.deviceKeyInto(fk[:cap(fk)], folder, device, name)
  124. bs, err := t.Get(fk, nil)
  125. if err == leveldb.ErrNotFound {
  126. if isLocalDevice {
  127. localSize.addFile(f)
  128. }
  129. if lv := t.insertFile(folder, device, f); lv > maxLocalVer {
  130. maxLocalVer = lv
  131. }
  132. if f.IsInvalid() {
  133. t.removeFromGlobal(folder, device, name, globalSize)
  134. } else {
  135. t.updateGlobal(folder, device, f, globalSize)
  136. }
  137. continue
  138. }
  139. var ef FileInfoTruncated
  140. err = ef.UnmarshalXDR(bs)
  141. if err != nil {
  142. panic(err)
  143. }
  144. // Flags might change without the version being bumped when we set the
  145. // invalid flag on an existing file.
  146. if !ef.Version.Equal(f.Version) || ef.Flags != f.Flags {
  147. if isLocalDevice {
  148. localSize.removeFile(ef)
  149. localSize.addFile(f)
  150. }
  151. if lv := t.insertFile(folder, device, f); lv > maxLocalVer {
  152. maxLocalVer = lv
  153. }
  154. if f.IsInvalid() {
  155. t.removeFromGlobal(folder, device, name, globalSize)
  156. } else {
  157. t.updateGlobal(folder, device, f, globalSize)
  158. }
  159. }
  160. // Write out and reuse the batch every few records, to avoid the batch
  161. // growing too large and thus allocating unnecessarily much memory.
  162. t.checkFlush()
  163. }
  164. return maxLocalVer
  165. }
  166. func (db *dbInstance) withHave(folder, device []byte, truncate bool, fn Iterator) {
  167. start := db.deviceKey(folder, device, nil) // before all folder/device files
  168. limit := db.deviceKey(folder, device, []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files
  169. t := db.newReadOnlyTransaction()
  170. defer t.close()
  171. dbi := t.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  172. defer dbi.Release()
  173. for dbi.Next() {
  174. f, err := unmarshalTrunc(dbi.Value(), truncate)
  175. if err != nil {
  176. panic(err)
  177. }
  178. if cont := fn(f); !cont {
  179. return
  180. }
  181. }
  182. }
  183. func (db *dbInstance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
  184. start := db.deviceKey(folder, nil, nil) // before all folder/device files
  185. limit := db.deviceKey(folder, protocol.LocalDeviceID[:], []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files
  186. t := db.newReadWriteTransaction()
  187. defer t.close()
  188. dbi := t.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  189. defer dbi.Release()
  190. for dbi.Next() {
  191. device := db.deviceKeyDevice(dbi.Key())
  192. var f FileInfoTruncated
  193. err := f.UnmarshalXDR(dbi.Value())
  194. if err != nil {
  195. panic(err)
  196. }
  197. switch f.Name {
  198. case "", ".", "..", "/": // A few obviously invalid filenames
  199. l.Infof("Dropping invalid filename %q from database", f.Name)
  200. t.removeFromGlobal(folder, device, nil, nil)
  201. t.Delete(dbi.Key())
  202. t.checkFlush()
  203. continue
  204. }
  205. if cont := fn(device, f); !cont {
  206. return
  207. }
  208. }
  209. }
  210. func (db *dbInstance) getFile(folder, device, file []byte) (protocol.FileInfo, bool) {
  211. return getFile(db, db.deviceKey(folder, device, file))
  212. }
  213. func (db *dbInstance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
  214. k := db.globalKey(folder, file)
  215. t := db.newReadOnlyTransaction()
  216. defer t.close()
  217. bs, err := t.Get(k, nil)
  218. if err == leveldb.ErrNotFound {
  219. return nil, false
  220. }
  221. if err != nil {
  222. panic(err)
  223. }
  224. var vl versionList
  225. err = vl.UnmarshalXDR(bs)
  226. if err != nil {
  227. panic(err)
  228. }
  229. if len(vl.versions) == 0 {
  230. l.Debugln(k)
  231. panic("no versions?")
  232. }
  233. k = db.deviceKey(folder, vl.versions[0].device, file)
  234. bs, err = t.Get(k, nil)
  235. if err != nil {
  236. panic(err)
  237. }
  238. fi, err := unmarshalTrunc(bs, truncate)
  239. if err != nil {
  240. panic(err)
  241. }
  242. return fi, true
  243. }
  244. func (db *dbInstance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
  245. t := db.newReadOnlyTransaction()
  246. defer t.close()
  247. dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, prefix)), nil)
  248. defer dbi.Release()
  249. var fk []byte
  250. for dbi.Next() {
  251. var vl versionList
  252. err := vl.UnmarshalXDR(dbi.Value())
  253. if err != nil {
  254. panic(err)
  255. }
  256. if len(vl.versions) == 0 {
  257. l.Debugln(dbi.Key())
  258. panic("no versions?")
  259. }
  260. name := db.globalKeyName(dbi.Key())
  261. fk = db.deviceKeyInto(fk[:cap(fk)], folder, vl.versions[0].device, name)
  262. bs, err := t.Get(fk, nil)
  263. if err != nil {
  264. l.Debugf("folder: %q (%x)", folder, folder)
  265. l.Debugf("key: %q (%x)", dbi.Key(), dbi.Key())
  266. l.Debugf("vl: %v", vl)
  267. l.Debugf("vl.versions[0].device: %x", vl.versions[0].device)
  268. l.Debugf("name: %q (%x)", name, name)
  269. l.Debugf("fk: %q", fk)
  270. l.Debugf("fk: %x %x %x", fk[1:1+64], fk[1+64:1+64+32], fk[1+64+32:])
  271. panic(err)
  272. }
  273. f, err := unmarshalTrunc(bs, truncate)
  274. if err != nil {
  275. panic(err)
  276. }
  277. if cont := fn(f); !cont {
  278. return
  279. }
  280. }
  281. }
  282. func (db *dbInstance) availability(folder, file []byte) []protocol.DeviceID {
  283. k := db.globalKey(folder, file)
  284. bs, err := db.Get(k, nil)
  285. if err == leveldb.ErrNotFound {
  286. return nil
  287. }
  288. if err != nil {
  289. panic(err)
  290. }
  291. var vl versionList
  292. err = vl.UnmarshalXDR(bs)
  293. if err != nil {
  294. panic(err)
  295. }
  296. var devices []protocol.DeviceID
  297. for _, v := range vl.versions {
  298. if !v.version.Equal(vl.versions[0].version) {
  299. break
  300. }
  301. n := protocol.DeviceIDFromBytes(v.device)
  302. devices = append(devices, n)
  303. }
  304. return devices
  305. }
  306. func (db *dbInstance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
  307. start := db.globalKey(folder, nil)
  308. limit := db.globalKey(folder, []byte{0xff, 0xff, 0xff, 0xff})
  309. t := db.newReadOnlyTransaction()
  310. defer t.close()
  311. dbi := t.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  312. defer dbi.Release()
  313. var fk []byte
  314. nextFile:
  315. for dbi.Next() {
  316. var vl versionList
  317. err := vl.UnmarshalXDR(dbi.Value())
  318. if err != nil {
  319. panic(err)
  320. }
  321. if len(vl.versions) == 0 {
  322. l.Debugln(dbi.Key())
  323. panic("no versions?")
  324. }
  325. have := false // If we have the file, any version
  326. need := false // If we have a lower version of the file
  327. var haveVersion protocol.Vector
  328. for _, v := range vl.versions {
  329. if bytes.Compare(v.device, device) == 0 {
  330. have = true
  331. haveVersion = v.version
  332. // XXX: This marks Concurrent (i.e. conflicting) changes as
  333. // needs. Maybe we should do that, but it needs special
  334. // handling in the puller.
  335. need = !v.version.GreaterEqual(vl.versions[0].version)
  336. break
  337. }
  338. }
  339. if need || !have {
  340. name := db.globalKeyName(dbi.Key())
  341. needVersion := vl.versions[0].version
  342. nextVersion:
  343. for i := range vl.versions {
  344. if !vl.versions[i].version.Equal(needVersion) {
  345. // We haven't found a valid copy of the file with the needed version.
  346. continue nextFile
  347. }
  348. fk = db.deviceKeyInto(fk[:cap(fk)], folder, vl.versions[i].device, name)
  349. bs, err := t.Get(fk, nil)
  350. if err != nil {
  351. var id protocol.DeviceID
  352. copy(id[:], device)
  353. l.Debugf("device: %v", id)
  354. l.Debugf("need: %v, have: %v", need, have)
  355. l.Debugf("key: %q (%x)", dbi.Key(), dbi.Key())
  356. l.Debugf("vl: %v", vl)
  357. l.Debugf("i: %v", i)
  358. l.Debugf("fk: %q (%x)", fk, fk)
  359. l.Debugf("name: %q (%x)", name, name)
  360. panic(err)
  361. }
  362. gf, err := unmarshalTrunc(bs, truncate)
  363. if err != nil {
  364. panic(err)
  365. }
  366. if gf.IsInvalid() {
  367. // The file is marked invalid for whatever reason, don't use it.
  368. continue nextVersion
  369. }
  370. if gf.IsDeleted() && !have {
  371. // We don't need deleted files that we don't have
  372. continue nextFile
  373. }
  374. l.Debugf("need folder=%q device=%v name=%q need=%v have=%v haveV=%d globalV=%d", folder, protocol.DeviceIDFromBytes(device), name, need, have, haveVersion, vl.versions[0].version)
  375. if cont := fn(gf); !cont {
  376. return
  377. }
  378. // This file is handled, no need to look further in the version list
  379. continue nextFile
  380. }
  381. }
  382. }
  383. }
  384. func (db *dbInstance) listFolders() []string {
  385. t := db.newReadOnlyTransaction()
  386. defer t.close()
  387. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
  388. defer dbi.Release()
  389. folderExists := make(map[string]bool)
  390. for dbi.Next() {
  391. folder := string(db.globalKeyFolder(dbi.Key()))
  392. if !folderExists[folder] {
  393. folderExists[folder] = true
  394. }
  395. }
  396. folders := make([]string, 0, len(folderExists))
  397. for k := range folderExists {
  398. folders = append(folders, k)
  399. }
  400. sort.Strings(folders)
  401. return folders
  402. }
  403. func (db *dbInstance) dropFolder(folder []byte) {
  404. t := db.newReadOnlyTransaction()
  405. defer t.close()
  406. // Remove all items related to the given folder from the device->file bucket
  407. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeDevice}), nil)
  408. for dbi.Next() {
  409. itemFolder := db.deviceKeyFolder(dbi.Key())
  410. if bytes.Compare(folder, itemFolder) == 0 {
  411. db.Delete(dbi.Key(), nil)
  412. }
  413. }
  414. dbi.Release()
  415. // Remove all items related to the given folder from the global bucket
  416. dbi = t.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
  417. for dbi.Next() {
  418. itemFolder := db.globalKeyFolder(dbi.Key())
  419. if bytes.Compare(folder, itemFolder) == 0 {
  420. db.Delete(dbi.Key(), nil)
  421. }
  422. }
  423. dbi.Release()
  424. }
  425. func (db *dbInstance) checkGlobals(folder []byte, globalSize *sizeTracker) {
  426. t := db.newReadWriteTransaction()
  427. defer t.close()
  428. start := db.globalKey(folder, nil)
  429. limit := db.globalKey(folder, []byte{0xff, 0xff, 0xff, 0xff})
  430. dbi := t.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  431. defer dbi.Release()
  432. var fk []byte
  433. for dbi.Next() {
  434. gk := dbi.Key()
  435. var vl versionList
  436. err := vl.UnmarshalXDR(dbi.Value())
  437. if err != nil {
  438. panic(err)
  439. }
  440. // Check the global version list for consistency. An issue in previous
  441. // versions of goleveldb could result in reordered writes so that
  442. // there are global entries pointing to no longer existing files. Here
  443. // we find those and clear them out.
  444. name := db.globalKeyName(gk)
  445. var newVL versionList
  446. for i, version := range vl.versions {
  447. fk = db.deviceKeyInto(fk[:cap(fk)], folder, version.device, name)
  448. _, err := t.Get(fk, nil)
  449. if err == leveldb.ErrNotFound {
  450. continue
  451. }
  452. if err != nil {
  453. panic(err)
  454. }
  455. newVL.versions = append(newVL.versions, version)
  456. if i == 0 {
  457. fi, ok := t.getFile(folder, version.device, name)
  458. if !ok {
  459. panic("nonexistent global master file")
  460. }
  461. globalSize.addFile(fi)
  462. }
  463. }
  464. if len(newVL.versions) != len(vl.versions) {
  465. t.Put(dbi.Key(), newVL.MustMarshalXDR())
  466. t.checkFlush()
  467. }
  468. }
  469. l.Debugf("db check completed for %q", folder)
  470. }
  471. // deviceKey returns a byte slice encoding the following information:
  472. // keyTypeDevice (1 byte)
  473. // folder (64 bytes)
  474. // device (32 bytes)
  475. // name (variable size)
  476. func (db *dbInstance) deviceKey(folder, device, file []byte) []byte {
  477. return db.deviceKeyInto(nil, folder, device, file)
  478. }
  479. func (db *dbInstance) deviceKeyInto(k []byte, folder, device, file []byte) []byte {
  480. reqLen := 1 + 64 + 32 + len(file)
  481. if len(k) < reqLen {
  482. k = make([]byte, reqLen)
  483. }
  484. k[0] = KeyTypeDevice
  485. if len(folder) > 64 {
  486. panic("folder name too long")
  487. }
  488. copy(k[1:], []byte(folder))
  489. copy(k[1+64:], device[:])
  490. copy(k[1+64+32:], []byte(file))
  491. return k[:reqLen]
  492. }
  493. func (db *dbInstance) deviceKeyName(key []byte) []byte {
  494. return key[1+64+32:]
  495. }
  496. func (db *dbInstance) deviceKeyFolder(key []byte) []byte {
  497. folder := key[1 : 1+64]
  498. izero := bytes.IndexByte(folder, 0)
  499. if izero < 0 {
  500. return folder
  501. }
  502. return folder[:izero]
  503. }
  504. func (db *dbInstance) deviceKeyDevice(key []byte) []byte {
  505. return key[1+64 : 1+64+32]
  506. }
  507. // globalKey returns a byte slice encoding the following information:
  508. // keyTypeGlobal (1 byte)
  509. // folder (64 bytes)
  510. // name (variable size)
  511. func (db *dbInstance) globalKey(folder, file []byte) []byte {
  512. k := make([]byte, 1+64+len(file))
  513. k[0] = KeyTypeGlobal
  514. if len(folder) > 64 {
  515. panic("folder name too long")
  516. }
  517. copy(k[1:], []byte(folder))
  518. copy(k[1+64:], []byte(file))
  519. return k
  520. }
  521. func (db *dbInstance) globalKeyName(key []byte) []byte {
  522. return key[1+64:]
  523. }
  524. func (db *dbInstance) globalKeyFolder(key []byte) []byte {
  525. folder := key[1 : 1+64]
  526. izero := bytes.IndexByte(folder, 0)
  527. if izero < 0 {
  528. return folder
  529. }
  530. return folder[:izero]
  531. }
  532. func unmarshalTrunc(bs []byte, truncate bool) (FileIntf, error) {
  533. if truncate {
  534. var tf FileInfoTruncated
  535. err := tf.UnmarshalXDR(bs)
  536. return tf, err
  537. }
  538. var tf protocol.FileInfo
  539. err := tf.UnmarshalXDR(bs)
  540. return tf, err
  541. }