lowlevel.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "os"
  9. "strings"
  10. "sync/atomic"
  11. "github.com/syndtr/goleveldb/leveldb"
  12. "github.com/syndtr/goleveldb/leveldb/errors"
  13. "github.com/syndtr/goleveldb/leveldb/opt"
  14. "github.com/syndtr/goleveldb/leveldb/storage"
  15. )
  16. const (
  17. dbMaxOpenFiles = 100
  18. dbWriteBuffer = 16 << 20
  19. dbFlushBatch = dbWriteBuffer / 4 // Some leeway for any leveldb in-memory optimizations
  20. )
  21. // Lowlevel is the lowest level database interface. It has a very simple
  22. // purpose: hold the actual *leveldb.DB database, and the in-memory state
  23. // that belong to that database. In the same way that a single on disk
  24. // database can only be opened once, there should be only one Lowlevel for
  25. // any given *leveldb.DB.
  26. type Lowlevel struct {
  27. committed int64 // atomic, must come first
  28. *leveldb.DB
  29. location string
  30. folderIdx *smallIndex
  31. deviceIdx *smallIndex
  32. }
  33. // Open attempts to open the database at the given location, and runs
  34. // recovery on it if opening fails. Worst case, if recovery is not possible,
  35. // the database is erased and created from scratch.
  36. func Open(location string) (*Lowlevel, error) {
  37. opts := &opt.Options{
  38. OpenFilesCacheCapacity: dbMaxOpenFiles,
  39. WriteBuffer: dbWriteBuffer,
  40. }
  41. return open(location, opts)
  42. }
  43. // OpenRO attempts to open the database at the given location, read only.
  44. func OpenRO(location string) (*Lowlevel, error) {
  45. opts := &opt.Options{
  46. OpenFilesCacheCapacity: dbMaxOpenFiles,
  47. ReadOnly: true,
  48. }
  49. return open(location, opts)
  50. }
  51. func open(location string, opts *opt.Options) (*Lowlevel, error) {
  52. db, err := leveldb.OpenFile(location, opts)
  53. if leveldbIsCorrupted(err) {
  54. db, err = leveldb.RecoverFile(location, opts)
  55. }
  56. if leveldbIsCorrupted(err) {
  57. // The database is corrupted, and we've tried to recover it but it
  58. // didn't work. At this point there isn't much to do beyond dropping
  59. // the database and reindexing...
  60. l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
  61. if err := os.RemoveAll(location); err != nil {
  62. return nil, errorSuggestion{err, "failed to delete corrupted database"}
  63. }
  64. db, err = leveldb.OpenFile(location, opts)
  65. }
  66. if err != nil {
  67. return nil, errorSuggestion{err, "is another instance of Syncthing running?"}
  68. }
  69. return NewLowlevel(db, location), nil
  70. }
  71. // OpenMemory returns a new Lowlevel referencing an in-memory database.
  72. func OpenMemory() *Lowlevel {
  73. db, _ := leveldb.Open(storage.NewMemStorage(), nil)
  74. return NewLowlevel(db, "<memory>")
  75. }
  76. // ListFolders returns the list of folders currently in the database
  77. func (db *Lowlevel) ListFolders() []string {
  78. return db.folderIdx.Values()
  79. }
  80. // Committed returns the number of items committed to the database since startup
  81. func (db *Lowlevel) Committed() int64 {
  82. return atomic.LoadInt64(&db.committed)
  83. }
  84. func (db *Lowlevel) Put(key, val []byte, wo *opt.WriteOptions) error {
  85. atomic.AddInt64(&db.committed, 1)
  86. return db.DB.Put(key, val, wo)
  87. }
  88. func (db *Lowlevel) Delete(key []byte, wo *opt.WriteOptions) error {
  89. atomic.AddInt64(&db.committed, 1)
  90. return db.DB.Delete(key, wo)
  91. }
  92. // NewLowlevel wraps the given *leveldb.DB into a *lowlevel
  93. func NewLowlevel(db *leveldb.DB, location string) *Lowlevel {
  94. return &Lowlevel{
  95. DB: db,
  96. location: location,
  97. folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}),
  98. deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}),
  99. }
  100. }
  101. // A "better" version of leveldb's errors.IsCorrupted.
  102. func leveldbIsCorrupted(err error) bool {
  103. switch {
  104. case err == nil:
  105. return false
  106. case errors.IsCorrupted(err):
  107. return true
  108. case strings.Contains(err.Error(), "corrupted"):
  109. return true
  110. }
  111. return false
  112. }
  113. type batch struct {
  114. *leveldb.Batch
  115. db *Lowlevel
  116. }
  117. func (db *Lowlevel) newBatch() *batch {
  118. return &batch{
  119. Batch: new(leveldb.Batch),
  120. db: db,
  121. }
  122. }
  123. // checkFlush flushes and resets the batch if its size exceeds dbFlushBatch.
  124. func (b *batch) checkFlush() {
  125. if len(b.Dump()) > dbFlushBatch {
  126. b.flush()
  127. b.Reset()
  128. }
  129. }
  130. func (b *batch) flush() {
  131. if err := b.db.Write(b.Batch, nil); err != nil {
  132. panic(err)
  133. }
  134. }