lowlevel.go 11 KB


  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "github.com/syndtr/goleveldb/leveldb"
  14. "github.com/syndtr/goleveldb/leveldb/errors"
  15. "github.com/syndtr/goleveldb/leveldb/iterator"
  16. "github.com/syndtr/goleveldb/leveldb/opt"
  17. "github.com/syndtr/goleveldb/leveldb/storage"
  18. "github.com/syndtr/goleveldb/leveldb/util"
  19. )
  20. const (
  21. dbMaxOpenFiles = 100
  22. dbWriteBuffer = 16 << 20
  23. )
  24. var (
  25. dbFlushBatch = debugEnvValue("WriteBuffer", dbWriteBuffer) / 4 // Some leeway for any leveldb in-memory optimizations
  26. )
  27. // Lowlevel is the lowest level database interface. It has a very simple
  28. // purpose: hold the actual *leveldb.DB database, and the in-memory state
  29. // that belong to that database. In the same way that a single on disk
  30. // database can only be opened once, there should be only one Lowlevel for
  31. // any given *leveldb.DB.
  32. type Lowlevel struct {
  33. committed int64 // atomic, must come first
  34. *leveldb.DB
  35. location string
  36. folderIdx *smallIndex
  37. deviceIdx *smallIndex
  38. closed bool
  39. closeMut *sync.RWMutex
  40. iterWG sync.WaitGroup
  41. }
  42. // Open attempts to open the database at the given location, and runs
  43. // recovery on it if opening fails. Worst case, if recovery is not possible,
  44. // the database is erased and created from scratch.
  45. func Open(location string) (*Lowlevel, error) {
  46. opts := &opt.Options{
  47. BlockCacheCapacity: debugEnvValue("BlockCacheCapacity", 0),
  48. BlockCacheEvictRemoved: debugEnvValue("BlockCacheEvictRemoved", 0) != 0,
  49. BlockRestartInterval: debugEnvValue("BlockRestartInterval", 0),
  50. BlockSize: debugEnvValue("BlockSize", 0),
  51. CompactionExpandLimitFactor: debugEnvValue("CompactionExpandLimitFactor", 0),
  52. CompactionGPOverlapsFactor: debugEnvValue("CompactionGPOverlapsFactor", 0),
  53. CompactionL0Trigger: debugEnvValue("CompactionL0Trigger", 0),
  54. CompactionSourceLimitFactor: debugEnvValue("CompactionSourceLimitFactor", 0),
  55. CompactionTableSize: debugEnvValue("CompactionTableSize", 0),
  56. CompactionTableSizeMultiplier: float64(debugEnvValue("CompactionTableSizeMultiplier", 0)) / 10.0,
  57. CompactionTotalSize: debugEnvValue("CompactionTotalSize", 0),
  58. CompactionTotalSizeMultiplier: float64(debugEnvValue("CompactionTotalSizeMultiplier", 0)) / 10.0,
  59. DisableBufferPool: debugEnvValue("DisableBufferPool", 0) != 0,
  60. DisableBlockCache: debugEnvValue("DisableBlockCache", 0) != 0,
  61. DisableCompactionBackoff: debugEnvValue("DisableCompactionBackoff", 0) != 0,
  62. DisableLargeBatchTransaction: debugEnvValue("DisableLargeBatchTransaction", 0) != 0,
  63. NoSync: debugEnvValue("NoSync", 0) != 0,
  64. NoWriteMerge: debugEnvValue("NoWriteMerge", 0) != 0,
  65. OpenFilesCacheCapacity: debugEnvValue("OpenFilesCacheCapacity", dbMaxOpenFiles),
  66. WriteBuffer: debugEnvValue("WriteBuffer", dbWriteBuffer),
  67. // The write slowdown and pause can be overridden, but even if they
  68. // are not and the compaction trigger is overridden we need to
  69. // adjust so that we don't pause writes for L0 compaction before we
  70. // even *start* L0 compaction...
  71. WriteL0SlowdownTrigger: debugEnvValue("WriteL0SlowdownTrigger", 2*debugEnvValue("CompactionL0Trigger", opt.DefaultCompactionL0Trigger)),
  72. WriteL0PauseTrigger: debugEnvValue("WriteL0SlowdownTrigger", 3*debugEnvValue("CompactionL0Trigger", opt.DefaultCompactionL0Trigger)),
  73. }
  74. return open(location, opts)
  75. }
  76. // OpenRO attempts to open the database at the given location, read only.
  77. func OpenRO(location string) (*Lowlevel, error) {
  78. opts := &opt.Options{
  79. OpenFilesCacheCapacity: dbMaxOpenFiles,
  80. ReadOnly: true,
  81. }
  82. return open(location, opts)
  83. }
  84. func open(location string, opts *opt.Options) (*Lowlevel, error) {
  85. db, err := leveldb.OpenFile(location, opts)
  86. if leveldbIsCorrupted(err) {
  87. db, err = leveldb.RecoverFile(location, opts)
  88. }
  89. if leveldbIsCorrupted(err) {
  90. // The database is corrupted, and we've tried to recover it but it
  91. // didn't work. At this point there isn't much to do beyond dropping
  92. // the database and reindexing...
  93. l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
  94. if err := os.RemoveAll(location); err != nil {
  95. return nil, errorSuggestion{err, "failed to delete corrupted database"}
  96. }
  97. db, err = leveldb.OpenFile(location, opts)
  98. }
  99. if err != nil {
  100. return nil, errorSuggestion{err, "is another instance of Syncthing running?"}
  101. }
  102. if debugEnvValue("CompactEverything", 0) != 0 {
  103. if err := db.CompactRange(util.Range{}); err != nil {
  104. l.Warnln("Compacting database:", err)
  105. }
  106. }
  107. return NewLowlevel(db, location), nil
  108. }
  109. // OpenMemory returns a new Lowlevel referencing an in-memory database.
  110. func OpenMemory() *Lowlevel {
  111. db, _ := leveldb.Open(storage.NewMemStorage(), nil)
  112. return NewLowlevel(db, "<memory>")
  113. }
  114. // ListFolders returns the list of folders currently in the database
  115. func (db *Lowlevel) ListFolders() []string {
  116. return db.folderIdx.Values()
  117. }
  118. // Committed returns the number of items committed to the database since startup
  119. func (db *Lowlevel) Committed() int64 {
  120. return atomic.LoadInt64(&db.committed)
  121. }
  122. func (db *Lowlevel) Put(key, val []byte, wo *opt.WriteOptions) error {
  123. db.closeMut.RLock()
  124. defer db.closeMut.RUnlock()
  125. if db.closed {
  126. return leveldb.ErrClosed
  127. }
  128. atomic.AddInt64(&db.committed, 1)
  129. return db.DB.Put(key, val, wo)
  130. }
  131. func (db *Lowlevel) Write(batch *leveldb.Batch, wo *opt.WriteOptions) error {
  132. db.closeMut.RLock()
  133. defer db.closeMut.RUnlock()
  134. if db.closed {
  135. return leveldb.ErrClosed
  136. }
  137. return db.DB.Write(batch, wo)
  138. }
  139. func (db *Lowlevel) Delete(key []byte, wo *opt.WriteOptions) error {
  140. db.closeMut.RLock()
  141. defer db.closeMut.RUnlock()
  142. if db.closed {
  143. return leveldb.ErrClosed
  144. }
  145. atomic.AddInt64(&db.committed, 1)
  146. return db.DB.Delete(key, wo)
  147. }
  148. func (db *Lowlevel) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
  149. return db.newIterator(func() iterator.Iterator { return db.DB.NewIterator(slice, ro) })
  150. }
  151. // newIterator returns an iterator created with the given constructor only if db
  152. // is not yet closed. If it is closed, a closedIter is returned instead.
  153. func (db *Lowlevel) newIterator(constr func() iterator.Iterator) iterator.Iterator {
  154. db.closeMut.RLock()
  155. defer db.closeMut.RUnlock()
  156. if db.closed {
  157. return &closedIter{}
  158. }
  159. db.iterWG.Add(1)
  160. return &iter{
  161. Iterator: constr(),
  162. db: db,
  163. }
  164. }
  165. func (db *Lowlevel) GetSnapshot() snapshot {
  166. s, err := db.DB.GetSnapshot()
  167. if err != nil {
  168. if err == leveldb.ErrClosed {
  169. return &closedSnap{}
  170. }
  171. panic(err)
  172. }
  173. return &snap{
  174. Snapshot: s,
  175. db: db,
  176. }
  177. }
  178. func (db *Lowlevel) Close() {
  179. db.closeMut.Lock()
  180. if db.closed {
  181. db.closeMut.Unlock()
  182. return
  183. }
  184. db.closed = true
  185. db.closeMut.Unlock()
  186. db.iterWG.Wait()
  187. db.DB.Close()
  188. }
  189. // NewLowlevel wraps the given *leveldb.DB into a *lowlevel
  190. func NewLowlevel(db *leveldb.DB, location string) *Lowlevel {
  191. return &Lowlevel{
  192. DB: db,
  193. location: location,
  194. folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}),
  195. deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}),
  196. closeMut: &sync.RWMutex{},
  197. iterWG: sync.WaitGroup{},
  198. }
  199. }
  200. // A "better" version of leveldb's errors.IsCorrupted.
  201. func leveldbIsCorrupted(err error) bool {
  202. switch {
  203. case err == nil:
  204. return false
  205. case errors.IsCorrupted(err):
  206. return true
  207. case strings.Contains(err.Error(), "corrupted"):
  208. return true
  209. }
  210. return false
  211. }
  212. type batch struct {
  213. *leveldb.Batch
  214. db *Lowlevel
  215. }
  216. func (db *Lowlevel) newBatch() *batch {
  217. return &batch{
  218. Batch: new(leveldb.Batch),
  219. db: db,
  220. }
  221. }
  222. // checkFlush flushes and resets the batch if its size exceeds dbFlushBatch.
  223. func (b *batch) checkFlush() {
  224. if len(b.Dump()) > dbFlushBatch {
  225. b.flush()
  226. b.Reset()
  227. }
  228. }
  229. func (b *batch) flush() {
  230. if err := b.db.Write(b.Batch, nil); err != nil && err != leveldb.ErrClosed {
  231. panic(err)
  232. }
  233. }
  234. type closedIter struct{}
  235. func (it *closedIter) Release() {}
  236. func (it *closedIter) Key() []byte { return nil }
  237. func (it *closedIter) Value() []byte { return nil }
  238. func (it *closedIter) Next() bool { return false }
  239. func (it *closedIter) Prev() bool { return false }
  240. func (it *closedIter) First() bool { return false }
  241. func (it *closedIter) Last() bool { return false }
  242. func (it *closedIter) Seek(key []byte) bool { return false }
  243. func (it *closedIter) Valid() bool { return false }
  244. func (it *closedIter) Error() error { return leveldb.ErrClosed }
  245. func (it *closedIter) SetReleaser(releaser util.Releaser) {}
  246. type snapshot interface {
  247. Get([]byte, *opt.ReadOptions) ([]byte, error)
  248. Has([]byte, *opt.ReadOptions) (bool, error)
  249. NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator
  250. Release()
  251. }
  252. type closedSnap struct{}
  253. func (s *closedSnap) Get([]byte, *opt.ReadOptions) ([]byte, error) { return nil, leveldb.ErrClosed }
  254. func (s *closedSnap) Has([]byte, *opt.ReadOptions) (bool, error) { return false, leveldb.ErrClosed }
  255. func (s *closedSnap) NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator {
  256. return &closedIter{}
  257. }
  258. func (s *closedSnap) Release() {}
  259. type snap struct {
  260. *leveldb.Snapshot
  261. db *Lowlevel
  262. }
  263. func (s *snap) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
  264. return s.db.newIterator(func() iterator.Iterator { return s.Snapshot.NewIterator(slice, ro) })
  265. }
  266. // iter implements iterator.Iterator which allows tracking active iterators
  267. // and aborts if the underlying database is being closed.
  268. type iter struct {
  269. iterator.Iterator
  270. db *Lowlevel
  271. }
  272. func (it *iter) Release() {
  273. it.db.iterWG.Done()
  274. it.Iterator.Release()
  275. }
  276. func (it *iter) Next() bool {
  277. return it.execIfNotClosed(it.Iterator.Next)
  278. }
  279. func (it *iter) Prev() bool {
  280. return it.execIfNotClosed(it.Iterator.Prev)
  281. }
  282. func (it *iter) First() bool {
  283. return it.execIfNotClosed(it.Iterator.First)
  284. }
  285. func (it *iter) Last() bool {
  286. return it.execIfNotClosed(it.Iterator.Last)
  287. }
  288. func (it *iter) Seek(key []byte) bool {
  289. return it.execIfNotClosed(func() bool {
  290. return it.Iterator.Seek(key)
  291. })
  292. }
  293. func (it *iter) execIfNotClosed(fn func() bool) bool {
  294. it.db.closeMut.RLock()
  295. defer it.db.closeMut.RUnlock()
  296. if it.db.closed {
  297. return false
  298. }
  299. return fn()
  300. }
  301. func debugEnvValue(key string, def int) int {
  302. v, err := strconv.ParseInt(os.Getenv("STDEBUG_"+key), 10, 63)
  303. if err != nil {
  304. return def
  305. }
  306. return int(v)
  307. }