leveldb_dbinstance.go 19 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "encoding/binary"
  10. "fmt"
  11. "os"
  12. "sort"
  13. "strings"
  14. "sync/atomic"
  15. "github.com/syncthing/syncthing/lib/protocol"
  16. "github.com/syndtr/goleveldb/leveldb"
  17. "github.com/syndtr/goleveldb/leveldb/errors"
  18. "github.com/syndtr/goleveldb/leveldb/iterator"
  19. "github.com/syndtr/goleveldb/leveldb/opt"
  20. "github.com/syndtr/goleveldb/leveldb/storage"
  21. "github.com/syndtr/goleveldb/leveldb/util"
  22. )
  23. type deletionHandler func(t readWriteTransaction, folder, device, name []byte, dbi iterator.Iterator)
  24. type Instance struct {
  25. committed int64 // this must be the first attribute in the struct to ensure 64 bit alignment on 32 bit plaforms
  26. *leveldb.DB
  27. location string
  28. folderIdx *smallIndex
  29. deviceIdx *smallIndex
  30. keyer keyer
  31. }
  32. func Open(file string) (*Instance, error) {
  33. opts := &opt.Options{
  34. OpenFilesCacheCapacity: 100,
  35. WriteBuffer: 4 << 20,
  36. }
  37. db, err := leveldb.OpenFile(file, opts)
  38. if leveldbIsCorrupted(err) {
  39. db, err = leveldb.RecoverFile(file, opts)
  40. }
  41. if leveldbIsCorrupted(err) {
  42. // The database is corrupted, and we've tried to recover it but it
  43. // didn't work. At this point there isn't much to do beyond dropping
  44. // the database and reindexing...
  45. l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
  46. if err := os.RemoveAll(file); err != nil {
  47. return nil, errorSuggestion{err, "failed to delete corrupted database"}
  48. }
  49. db, err = leveldb.OpenFile(file, opts)
  50. }
  51. if err != nil {
  52. return nil, errorSuggestion{err, "is another instance of Syncthing running?"}
  53. }
  54. return newDBInstance(db, file)
  55. }
  56. func OpenMemory() *Instance {
  57. db, _ := leveldb.Open(storage.NewMemStorage(), nil)
  58. ldb, _ := newDBInstance(db, "<memory>")
  59. return ldb
  60. }
  61. func newDBInstance(db *leveldb.DB, location string) (*Instance, error) {
  62. i := &Instance{
  63. DB: db,
  64. location: location,
  65. folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}),
  66. deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}),
  67. }
  68. i.keyer = newDefaultKeyer(i.folderIdx, i.deviceIdx)
  69. err := i.updateSchema()
  70. return i, err
  71. }
  72. // Committed returns the number of items committed to the database since startup
  73. func (db *Instance) Committed() int64 {
  74. return atomic.LoadInt64(&db.committed)
  75. }
  76. // Location returns the filesystem path where the database is stored
  77. func (db *Instance) Location() string {
  78. return db.location
  79. }
  80. func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) {
  81. t := db.newReadWriteTransaction()
  82. defer t.close()
  83. var fk []byte
  84. var gk []byte
  85. for _, f := range fs {
  86. name := []byte(f.Name)
  87. fk = db.keyer.GenerateDeviceFileKey(fk, folder, device, name)
  88. // Get and unmarshal the file entry. If it doesn't exist or can't be
  89. // unmarshalled we'll add it as a new entry.
  90. bs, err := t.Get(fk, nil)
  91. var ef FileInfoTruncated
  92. if err == nil {
  93. err = ef.Unmarshal(bs)
  94. }
  95. // Local flags or the invalid bit might change without the version
  96. // being bumped. The IsInvalid() method handles both.
  97. if err == nil && ef.Version.Equal(f.Version) && ef.IsInvalid() == f.IsInvalid() {
  98. continue
  99. }
  100. devID := protocol.DeviceIDFromBytes(device)
  101. if err == nil {
  102. meta.removeFile(devID, ef)
  103. }
  104. meta.addFile(devID, f)
  105. t.insertFile(fk, folder, device, f)
  106. gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  107. t.updateGlobal(gk, folder, device, f, meta)
  108. // Write out and reuse the batch every few records, to avoid the batch
  109. // growing too large and thus allocating unnecessarily much memory.
  110. t.checkFlush()
  111. }
  112. }
  113. func (db *Instance) addSequences(folder []byte, fs []protocol.FileInfo) {
  114. t := db.newReadWriteTransaction()
  115. defer t.close()
  116. var sk []byte
  117. var dk []byte
  118. for _, f := range fs {
  119. sk = db.keyer.GenerateSequenceKey(sk, folder, f.Sequence)
  120. dk = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], []byte(f.Name))
  121. t.Put(sk, dk)
  122. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  123. t.checkFlush()
  124. }
  125. }
  126. func (db *Instance) removeSequences(folder []byte, fs []protocol.FileInfo) {
  127. t := db.newReadWriteTransaction()
  128. defer t.close()
  129. var sk []byte
  130. for _, f := range fs {
  131. t.Delete(db.keyer.GenerateSequenceKey(sk, folder, f.Sequence))
  132. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  133. t.checkFlush()
  134. }
  135. }
  136. func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
  137. if len(prefix) > 0 {
  138. unslashedPrefix := prefix
  139. if bytes.HasSuffix(prefix, []byte{'/'}) {
  140. unslashedPrefix = unslashedPrefix[:len(unslashedPrefix)-1]
  141. } else {
  142. prefix = append(prefix, '/')
  143. }
  144. if f, ok := db.getFileTrunc(db.keyer.GenerateDeviceFileKey(nil, folder, device, unslashedPrefix), true); ok && !fn(f) {
  145. return
  146. }
  147. }
  148. t := db.newReadOnlyTransaction()
  149. defer t.close()
  150. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateDeviceFileKey(nil, folder, device, prefix)), nil)
  151. defer dbi.Release()
  152. for dbi.Next() {
  153. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  154. if len(prefix) > 0 && !bytes.HasPrefix(name, prefix) {
  155. return
  156. }
  157. // The iterator function may keep a reference to the unmarshalled
  158. // struct, which in turn references the buffer it was unmarshalled
  159. // from. dbi.Value() just returns an internal slice that it reuses, so
  160. // we need to copy it.
  161. f, err := unmarshalTrunc(append([]byte{}, dbi.Value()...), truncate)
  162. if err != nil {
  163. l.Debugln("unmarshal error:", err)
  164. continue
  165. }
  166. if !fn(f) {
  167. return
  168. }
  169. }
  170. }
  171. func (db *Instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator) {
  172. t := db.newReadOnlyTransaction()
  173. defer t.close()
  174. dbi := t.NewIterator(&util.Range{Start: db.keyer.GenerateSequenceKey(nil, folder, startSeq), Limit: db.keyer.GenerateSequenceKey(nil, folder, maxInt64)}, nil)
  175. defer dbi.Release()
  176. for dbi.Next() {
  177. f, ok := db.getFile(dbi.Value())
  178. if !ok {
  179. l.Debugln("missing file for sequence number", db.keyer.SequenceFromSequenceKey(dbi.Key()))
  180. continue
  181. }
  182. if shouldDebug() {
  183. key := dbi.Key()
  184. seq := int64(binary.BigEndian.Uint64(key[keyPrefixLen+keyFolderLen:]))
  185. if f.Sequence != seq {
  186. panic(fmt.Sprintf("sequence index corruption, file sequence %d != expected %d", f.Sequence, seq))
  187. }
  188. }
  189. if !fn(f) {
  190. return
  191. }
  192. }
  193. }
  194. func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
  195. t := db.newReadWriteTransaction()
  196. defer t.close()
  197. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil).WithoutName()), nil)
  198. defer dbi.Release()
  199. var gk []byte
  200. for dbi.Next() {
  201. device, ok := db.keyer.DeviceFromDeviceFileKey(dbi.Key())
  202. if !ok {
  203. // Not having the device in the index is bad. Clear it.
  204. t.Delete(dbi.Key())
  205. t.checkFlush()
  206. continue
  207. }
  208. var f FileInfoTruncated
  209. // The iterator function may keep a reference to the unmarshalled
  210. // struct, which in turn references the buffer it was unmarshalled
  211. // from. dbi.Value() just returns an internal slice that it reuses, so
  212. // we need to copy it.
  213. err := f.Unmarshal(append([]byte{}, dbi.Value()...))
  214. if err != nil {
  215. l.Debugln("unmarshal error:", err)
  216. continue
  217. }
  218. switch f.Name {
  219. case "", ".", "..", "/": // A few obviously invalid filenames
  220. l.Infof("Dropping invalid filename %q from database", f.Name)
  221. name := []byte(f.Name)
  222. gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  223. t.removeFromGlobal(gk, folder, device, name, nil)
  224. t.Delete(dbi.Key())
  225. t.checkFlush()
  226. continue
  227. }
  228. if !fn(device, f) {
  229. return
  230. }
  231. }
  232. }
  233. func (db *Instance) getFile(key []byte) (protocol.FileInfo, bool) {
  234. if f, ok := db.getFileTrunc(key, false); ok {
  235. return f.(protocol.FileInfo), true
  236. }
  237. return protocol.FileInfo{}, false
  238. }
  239. func (db *Instance) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) {
  240. bs, err := db.Get(key, nil)
  241. if err == leveldb.ErrNotFound {
  242. return nil, false
  243. }
  244. if err != nil {
  245. l.Debugln("surprise error:", err)
  246. return nil, false
  247. }
  248. f, err := unmarshalTrunc(bs, trunc)
  249. if err != nil {
  250. l.Debugln("unmarshal error:", err)
  251. return nil, false
  252. }
  253. return f, true
  254. }
  255. func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
  256. t := db.newReadOnlyTransaction()
  257. defer t.close()
  258. _, _, f, ok := db.getGlobalInto(t, nil, nil, folder, file, truncate)
  259. return f, ok
  260. }
  261. func (db *Instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file []byte, truncate bool) ([]byte, []byte, FileIntf, bool) {
  262. gk = db.keyer.GenerateGlobalVersionKey(gk, folder, file)
  263. bs, err := t.Get(gk, nil)
  264. if err != nil {
  265. return gk, dk, nil, false
  266. }
  267. vl, ok := unmarshalVersionList(bs)
  268. if !ok {
  269. return gk, dk, nil, false
  270. }
  271. dk = db.keyer.GenerateDeviceFileKey(dk, folder, vl.Versions[0].Device, file)
  272. if fi, ok := db.getFileTrunc(dk, truncate); ok {
  273. return gk, dk, fi, true
  274. }
  275. return gk, dk, nil, false
  276. }
  277. func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
  278. if len(prefix) > 0 {
  279. unslashedPrefix := prefix
  280. if bytes.HasSuffix(prefix, []byte{'/'}) {
  281. unslashedPrefix = unslashedPrefix[:len(unslashedPrefix)-1]
  282. } else {
  283. prefix = append(prefix, '/')
  284. }
  285. if f, ok := db.getGlobal(folder, unslashedPrefix, truncate); ok && !fn(f) {
  286. return
  287. }
  288. }
  289. t := db.newReadOnlyTransaction()
  290. defer t.close()
  291. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateGlobalVersionKey(nil, folder, prefix)), nil)
  292. defer dbi.Release()
  293. var fk []byte
  294. for dbi.Next() {
  295. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  296. if len(prefix) > 0 && !bytes.HasPrefix(name, prefix) {
  297. return
  298. }
  299. vl, ok := unmarshalVersionList(dbi.Value())
  300. if !ok {
  301. continue
  302. }
  303. fk = db.keyer.GenerateDeviceFileKey(fk, folder, vl.Versions[0].Device, name)
  304. f, ok := db.getFileTrunc(fk, truncate)
  305. if !ok {
  306. continue
  307. }
  308. if !fn(f) {
  309. return
  310. }
  311. }
  312. }
  313. func (db *Instance) availability(folder, file []byte) []protocol.DeviceID {
  314. k := db.keyer.GenerateGlobalVersionKey(nil, folder, file)
  315. bs, err := db.Get(k, nil)
  316. if err == leveldb.ErrNotFound {
  317. return nil
  318. }
  319. if err != nil {
  320. l.Debugln("surprise error:", err)
  321. return nil
  322. }
  323. vl, ok := unmarshalVersionList(bs)
  324. if !ok {
  325. return nil
  326. }
  327. var devices []protocol.DeviceID
  328. for _, v := range vl.Versions {
  329. if !v.Version.Equal(vl.Versions[0].Version) {
  330. break
  331. }
  332. if v.Invalid {
  333. continue
  334. }
  335. n := protocol.DeviceIDFromBytes(v.Device)
  336. devices = append(devices, n)
  337. }
  338. return devices
  339. }
  340. func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
  341. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  342. db.withNeedLocal(folder, truncate, fn)
  343. return
  344. }
  345. t := db.newReadOnlyTransaction()
  346. defer t.close()
  347. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateGlobalVersionKey(nil, folder, nil).WithoutName()), nil)
  348. defer dbi.Release()
  349. var fk []byte
  350. for dbi.Next() {
  351. vl, ok := unmarshalVersionList(dbi.Value())
  352. if !ok {
  353. continue
  354. }
  355. haveFV, have := vl.Get(device)
  356. // XXX: This marks Concurrent (i.e. conflicting) changes as
  357. // needs. Maybe we should do that, but it needs special
  358. // handling in the puller.
  359. if have && haveFV.Version.GreaterEqual(vl.Versions[0].Version) {
  360. continue
  361. }
  362. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  363. needVersion := vl.Versions[0].Version
  364. needDevice := protocol.DeviceIDFromBytes(vl.Versions[0].Device)
  365. for i := range vl.Versions {
  366. if !vl.Versions[i].Version.Equal(needVersion) {
  367. // We haven't found a valid copy of the file with the needed version.
  368. break
  369. }
  370. if vl.Versions[i].Invalid {
  371. // The file is marked invalid, don't use it.
  372. continue
  373. }
  374. fk = db.keyer.GenerateDeviceFileKey(fk, folder, vl.Versions[i].Device, name)
  375. bs, err := t.Get(fk, nil)
  376. if err != nil {
  377. l.Debugln("surprise error:", err)
  378. continue
  379. }
  380. gf, err := unmarshalTrunc(bs, truncate)
  381. if err != nil {
  382. l.Debugln("unmarshal error:", err)
  383. continue
  384. }
  385. if gf.IsDeleted() && !have {
  386. // We don't need deleted files that we don't have
  387. break
  388. }
  389. l.Debugf("need folder=%q device=%v name=%q have=%v invalid=%v haveV=%v globalV=%v globalDev=%v", folder, protocol.DeviceIDFromBytes(device), name, have, haveFV.Invalid, haveFV.Version, needVersion, needDevice)
  390. if !fn(gf) {
  391. return
  392. }
  393. // This file is handled, no need to look further in the version list
  394. break
  395. }
  396. }
  397. }
  398. func (db *Instance) withNeedLocal(folder []byte, truncate bool, fn Iterator) {
  399. t := db.newReadOnlyTransaction()
  400. defer t.close()
  401. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateNeedFileKey(nil, folder, nil).WithoutName()), nil)
  402. defer dbi.Release()
  403. var dk []byte
  404. var gk []byte
  405. var f FileIntf
  406. var ok bool
  407. for dbi.Next() {
  408. gk, dk, f, ok = db.getGlobalInto(t, gk, dk, folder, db.keyer.NameFromGlobalVersionKey(dbi.Key()), truncate)
  409. if !ok {
  410. continue
  411. }
  412. if !fn(f) {
  413. return
  414. }
  415. }
  416. }
  417. func (db *Instance) ListFolders() []string {
  418. t := db.newReadOnlyTransaction()
  419. defer t.close()
  420. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
  421. defer dbi.Release()
  422. folderExists := make(map[string]bool)
  423. for dbi.Next() {
  424. folder, ok := db.keyer.FolderFromGlobalVersionKey(dbi.Key())
  425. if ok && !folderExists[string(folder)] {
  426. folderExists[string(folder)] = true
  427. }
  428. }
  429. folders := make([]string, 0, len(folderExists))
  430. for k := range folderExists {
  431. folders = append(folders, k)
  432. }
  433. sort.Strings(folders)
  434. return folders
  435. }
  436. func (db *Instance) dropFolder(folder []byte) {
  437. t := db.newReadWriteTransaction()
  438. defer t.close()
  439. for _, key := range [][]byte{
  440. // Remove all items related to the given folder from the device->file bucket
  441. db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil).WithoutName(),
  442. // Remove all sequences related to the folder
  443. db.keyer.GenerateSequenceKey(nil, []byte(folder), 0).WithoutSequence(),
  444. // Remove all items related to the given folder from the global bucket
  445. db.keyer.GenerateGlobalVersionKey(nil, folder, nil).WithoutName(),
  446. // Remove all needs related to the folder
  447. db.keyer.GenerateNeedFileKey(nil, folder, nil).WithoutName(),
  448. } {
  449. t.deleteKeyPrefix(key)
  450. }
  451. }
  452. func (db *Instance) dropDeviceFolder(device, folder []byte, meta *metadataTracker) {
  453. t := db.newReadWriteTransaction()
  454. defer t.close()
  455. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)), nil)
  456. defer dbi.Release()
  457. var gk []byte
  458. for dbi.Next() {
  459. key := dbi.Key()
  460. name := db.keyer.NameFromDeviceFileKey(key)
  461. gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  462. t.removeFromGlobal(gk, folder, device, name, meta)
  463. t.Delete(key)
  464. t.checkFlush()
  465. }
  466. }
  467. func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) {
  468. t := db.newReadWriteTransaction()
  469. defer t.close()
  470. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateGlobalVersionKey(nil, folder, nil).WithoutName()), nil)
  471. defer dbi.Release()
  472. var fk []byte
  473. for dbi.Next() {
  474. vl, ok := unmarshalVersionList(dbi.Value())
  475. if !ok {
  476. continue
  477. }
  478. // Check the global version list for consistency. An issue in previous
  479. // versions of goleveldb could result in reordered writes so that
  480. // there are global entries pointing to no longer existing files. Here
  481. // we find those and clear them out.
  482. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  483. var newVL VersionList
  484. for i, version := range vl.Versions {
  485. fk = db.keyer.GenerateDeviceFileKey(fk, folder, version.Device, name)
  486. _, err := t.Get(fk, nil)
  487. if err == leveldb.ErrNotFound {
  488. continue
  489. }
  490. if err != nil {
  491. l.Debugln("surprise error:", err)
  492. return
  493. }
  494. newVL.Versions = append(newVL.Versions, version)
  495. if i == 0 {
  496. if fi, ok := db.getFile(fk); ok {
  497. meta.addFile(protocol.GlobalDeviceID, fi)
  498. }
  499. }
  500. }
  501. if len(newVL.Versions) != len(vl.Versions) {
  502. t.Put(dbi.Key(), mustMarshal(&newVL))
  503. t.checkFlush()
  504. }
  505. }
  506. l.Debugf("db check completed for %q", folder)
  507. }
  508. func (db *Instance) getIndexID(device, folder []byte) protocol.IndexID {
  509. key := db.keyer.GenerateIndexIDKey(nil, device, folder)
  510. cur, err := db.Get(key, nil)
  511. if err != nil {
  512. return 0
  513. }
  514. var id protocol.IndexID
  515. if err := id.Unmarshal(cur); err != nil {
  516. return 0
  517. }
  518. return id
  519. }
  520. func (db *Instance) setIndexID(device, folder []byte, id protocol.IndexID) {
  521. key := db.keyer.GenerateIndexIDKey(nil, device, folder)
  522. bs, _ := id.Marshal() // marshalling can't fail
  523. if err := db.Put(key, bs, nil); err != nil {
  524. panic("storing index ID: " + err.Error())
  525. }
  526. }
  527. // DropLocalDeltaIndexIDs removes all index IDs for the local device ID from
  528. // the database. This will cause a full index transmission on the next
  529. // connection.
  530. func (db *Instance) DropLocalDeltaIndexIDs() {
  531. db.dropDeltaIndexIDs(true)
  532. }
  533. // DropRemoteDeltaIndexIDs removes all index IDs for the other devices than
  534. // the local one from the database. This will cause them to send us a full
  535. // index on the next connection.
  536. func (db *Instance) DropRemoteDeltaIndexIDs() {
  537. db.dropDeltaIndexIDs(false)
  538. }
  539. func (db *Instance) dropDeltaIndexIDs(local bool) {
  540. t := db.newReadWriteTransaction()
  541. defer t.close()
  542. dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeIndexID}), nil)
  543. defer dbi.Release()
  544. for dbi.Next() {
  545. device, _ := db.keyer.DeviceFromIndexIDKey(dbi.Key())
  546. if bytes.Equal(device, protocol.LocalDeviceID[:]) == local {
  547. t.Delete(dbi.Key())
  548. }
  549. }
  550. }
  551. func (db *Instance) dropMtimes(folder []byte) {
  552. db.dropPrefix(db.keyer.GenerateMtimesKey(nil, folder))
  553. }
  554. func (db *Instance) dropFolderMeta(folder []byte) {
  555. db.dropPrefix(db.keyer.GenerateFolderMetaKey(nil, folder))
  556. }
  557. func (db *Instance) dropPrefix(prefix []byte) {
  558. t := db.newReadWriteTransaction()
  559. defer t.close()
  560. dbi := t.NewIterator(util.BytesPrefix(prefix), nil)
  561. defer dbi.Release()
  562. for dbi.Next() {
  563. t.Delete(dbi.Key())
  564. }
  565. }
  566. func unmarshalTrunc(bs []byte, truncate bool) (FileIntf, error) {
  567. if truncate {
  568. var tf FileInfoTruncated
  569. err := tf.Unmarshal(bs)
  570. return tf, err
  571. }
  572. var tf protocol.FileInfo
  573. err := tf.Unmarshal(bs)
  574. return tf, err
  575. }
  576. func unmarshalVersionList(data []byte) (VersionList, bool) {
  577. var vl VersionList
  578. if err := vl.Unmarshal(data); err != nil {
  579. l.Debugln("unmarshal error:", err)
  580. return VersionList{}, false
  581. }
  582. if len(vl.Versions) == 0 {
  583. l.Debugln("empty version list")
  584. return VersionList{}, false
  585. }
  586. return vl, true
  587. }
  588. // A "better" version of leveldb's errors.IsCorrupted.
  589. func leveldbIsCorrupted(err error) bool {
  590. switch {
  591. case err == nil:
  592. return false
  593. case errors.IsCorrupted(err):
  594. return true
  595. case strings.Contains(err.Error(), "corrupted"):
  596. return true
  597. }
  598. return false
  599. }
  600. type errorSuggestion struct {
  601. inner error
  602. suggestion string
  603. }
  604. func (e errorSuggestion) Error() string {
  605. return fmt.Sprintf("%s (%s)", e.inner.Error(), e.suggestion)
  606. }