| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- // Copyright (C) 2018 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- package db
- import (
- "os"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/iterator"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/storage"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- const (
- dbMaxOpenFiles = 100
- dbWriteBuffer = 16 << 20
- )
- var (
- dbFlushBatch = debugEnvValue("WriteBuffer", dbWriteBuffer) / 4 // Some leeway for any leveldb in-memory optimizations
- )
- // Lowlevel is the lowest level database interface. It has a very simple
- // purpose: hold the actual *leveldb.DB database, and the in-memory state
- // that belong to that database. In the same way that a single on disk
- // database can only be opened once, there should be only one Lowlevel for
- // any given *leveldb.DB.
- type Lowlevel struct {
- committed int64 // atomic, must come first
- *leveldb.DB
- location string
- folderIdx *smallIndex
- deviceIdx *smallIndex
- closed bool
- closeMut *sync.RWMutex
- iterWG sync.WaitGroup
- }
- // Open attempts to open the database at the given location, and runs
- // recovery on it if opening fails. Worst case, if recovery is not possible,
- // the database is erased and created from scratch.
- func Open(location string) (*Lowlevel, error) {
- opts := &opt.Options{
- BlockCacheCapacity: debugEnvValue("BlockCacheCapacity", 0),
- BlockCacheEvictRemoved: debugEnvValue("BlockCacheEvictRemoved", 0) != 0,
- BlockRestartInterval: debugEnvValue("BlockRestartInterval", 0),
- BlockSize: debugEnvValue("BlockSize", 0),
- CompactionExpandLimitFactor: debugEnvValue("CompactionExpandLimitFactor", 0),
- CompactionGPOverlapsFactor: debugEnvValue("CompactionGPOverlapsFactor", 0),
- CompactionL0Trigger: debugEnvValue("CompactionL0Trigger", 0),
- CompactionSourceLimitFactor: debugEnvValue("CompactionSourceLimitFactor", 0),
- CompactionTableSize: debugEnvValue("CompactionTableSize", 0),
- CompactionTableSizeMultiplier: float64(debugEnvValue("CompactionTableSizeMultiplier", 0)) / 10.0,
- CompactionTotalSize: debugEnvValue("CompactionTotalSize", 0),
- CompactionTotalSizeMultiplier: float64(debugEnvValue("CompactionTotalSizeMultiplier", 0)) / 10.0,
- DisableBufferPool: debugEnvValue("DisableBufferPool", 0) != 0,
- DisableBlockCache: debugEnvValue("DisableBlockCache", 0) != 0,
- DisableCompactionBackoff: debugEnvValue("DisableCompactionBackoff", 0) != 0,
- DisableLargeBatchTransaction: debugEnvValue("DisableLargeBatchTransaction", 0) != 0,
- NoSync: debugEnvValue("NoSync", 0) != 0,
- NoWriteMerge: debugEnvValue("NoWriteMerge", 0) != 0,
- OpenFilesCacheCapacity: debugEnvValue("OpenFilesCacheCapacity", dbMaxOpenFiles),
- WriteBuffer: debugEnvValue("WriteBuffer", dbWriteBuffer),
- // The write slowdown and pause can be overridden, but even if they
- // are not and the compaction trigger is overridden we need to
- // adjust so that we don't pause writes for L0 compaction before we
- // even *start* L0 compaction...
- WriteL0SlowdownTrigger: debugEnvValue("WriteL0SlowdownTrigger", 2*debugEnvValue("CompactionL0Trigger", opt.DefaultCompactionL0Trigger)),
- WriteL0PauseTrigger: debugEnvValue("WriteL0SlowdownTrigger", 3*debugEnvValue("CompactionL0Trigger", opt.DefaultCompactionL0Trigger)),
- }
- return open(location, opts)
- }
- // OpenRO attempts to open the database at the given location, read only.
- func OpenRO(location string) (*Lowlevel, error) {
- opts := &opt.Options{
- OpenFilesCacheCapacity: dbMaxOpenFiles,
- ReadOnly: true,
- }
- return open(location, opts)
- }
- func open(location string, opts *opt.Options) (*Lowlevel, error) {
- db, err := leveldb.OpenFile(location, opts)
- if leveldbIsCorrupted(err) {
- db, err = leveldb.RecoverFile(location, opts)
- }
- if leveldbIsCorrupted(err) {
- // The database is corrupted, and we've tried to recover it but it
- // didn't work. At this point there isn't much to do beyond dropping
- // the database and reindexing...
- l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
- if err := os.RemoveAll(location); err != nil {
- return nil, errorSuggestion{err, "failed to delete corrupted database"}
- }
- db, err = leveldb.OpenFile(location, opts)
- }
- if err != nil {
- return nil, errorSuggestion{err, "is another instance of Syncthing running?"}
- }
- if debugEnvValue("CompactEverything", 0) != 0 {
- if err := db.CompactRange(util.Range{}); err != nil {
- l.Warnln("Compacting database:", err)
- }
- }
- return NewLowlevel(db, location), nil
- }
- // OpenMemory returns a new Lowlevel referencing an in-memory database.
- func OpenMemory() *Lowlevel {
- db, _ := leveldb.Open(storage.NewMemStorage(), nil)
- return NewLowlevel(db, "<memory>")
- }
- // ListFolders returns the list of folders currently in the database
- func (db *Lowlevel) ListFolders() []string {
- return db.folderIdx.Values()
- }
- // Committed returns the number of items committed to the database since startup
- func (db *Lowlevel) Committed() int64 {
- return atomic.LoadInt64(&db.committed)
- }
- func (db *Lowlevel) Put(key, val []byte, wo *opt.WriteOptions) error {
- db.closeMut.RLock()
- defer db.closeMut.RUnlock()
- if db.closed {
- return leveldb.ErrClosed
- }
- atomic.AddInt64(&db.committed, 1)
- return db.DB.Put(key, val, wo)
- }
- func (db *Lowlevel) Write(batch *leveldb.Batch, wo *opt.WriteOptions) error {
- db.closeMut.RLock()
- defer db.closeMut.RUnlock()
- if db.closed {
- return leveldb.ErrClosed
- }
- return db.DB.Write(batch, wo)
- }
- func (db *Lowlevel) Delete(key []byte, wo *opt.WriteOptions) error {
- db.closeMut.RLock()
- defer db.closeMut.RUnlock()
- if db.closed {
- return leveldb.ErrClosed
- }
- atomic.AddInt64(&db.committed, 1)
- return db.DB.Delete(key, wo)
- }
- func (db *Lowlevel) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
- return db.newIterator(func() iterator.Iterator { return db.DB.NewIterator(slice, ro) })
- }
- // newIterator returns an iterator created with the given constructor only if db
- // is not yet closed. If it is closed, a closedIter is returned instead.
- func (db *Lowlevel) newIterator(constr func() iterator.Iterator) iterator.Iterator {
- db.closeMut.RLock()
- defer db.closeMut.RUnlock()
- if db.closed {
- return &closedIter{}
- }
- db.iterWG.Add(1)
- return &iter{
- Iterator: constr(),
- db: db,
- }
- }
- func (db *Lowlevel) GetSnapshot() snapshot {
- s, err := db.DB.GetSnapshot()
- if err != nil {
- if err == leveldb.ErrClosed {
- return &closedSnap{}
- }
- panic(err)
- }
- return &snap{
- Snapshot: s,
- db: db,
- }
- }
- func (db *Lowlevel) Close() {
- db.closeMut.Lock()
- if db.closed {
- db.closeMut.Unlock()
- return
- }
- db.closed = true
- db.closeMut.Unlock()
- db.iterWG.Wait()
- db.DB.Close()
- }
- // NewLowlevel wraps the given *leveldb.DB into a *lowlevel
- func NewLowlevel(db *leveldb.DB, location string) *Lowlevel {
- return &Lowlevel{
- DB: db,
- location: location,
- folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}),
- deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}),
- closeMut: &sync.RWMutex{},
- iterWG: sync.WaitGroup{},
- }
- }
- // A "better" version of leveldb's errors.IsCorrupted.
- func leveldbIsCorrupted(err error) bool {
- switch {
- case err == nil:
- return false
- case errors.IsCorrupted(err):
- return true
- case strings.Contains(err.Error(), "corrupted"):
- return true
- }
- return false
- }
- type batch struct {
- *leveldb.Batch
- db *Lowlevel
- }
- func (db *Lowlevel) newBatch() *batch {
- return &batch{
- Batch: new(leveldb.Batch),
- db: db,
- }
- }
- // checkFlush flushes and resets the batch if its size exceeds dbFlushBatch.
- func (b *batch) checkFlush() {
- if len(b.Dump()) > dbFlushBatch {
- b.flush()
- b.Reset()
- }
- }
- func (b *batch) flush() {
- if err := b.db.Write(b.Batch, nil); err != nil && err != leveldb.ErrClosed {
- panic(err)
- }
- }
- type closedIter struct{}
- func (it *closedIter) Release() {}
- func (it *closedIter) Key() []byte { return nil }
- func (it *closedIter) Value() []byte { return nil }
- func (it *closedIter) Next() bool { return false }
- func (it *closedIter) Prev() bool { return false }
- func (it *closedIter) First() bool { return false }
- func (it *closedIter) Last() bool { return false }
- func (it *closedIter) Seek(key []byte) bool { return false }
- func (it *closedIter) Valid() bool { return false }
- func (it *closedIter) Error() error { return leveldb.ErrClosed }
- func (it *closedIter) SetReleaser(releaser util.Releaser) {}
- type snapshot interface {
- Get([]byte, *opt.ReadOptions) ([]byte, error)
- Has([]byte, *opt.ReadOptions) (bool, error)
- NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator
- Release()
- }
- type closedSnap struct{}
- func (s *closedSnap) Get([]byte, *opt.ReadOptions) ([]byte, error) { return nil, leveldb.ErrClosed }
- func (s *closedSnap) Has([]byte, *opt.ReadOptions) (bool, error) { return false, leveldb.ErrClosed }
- func (s *closedSnap) NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator {
- return &closedIter{}
- }
- func (s *closedSnap) Release() {}
- type snap struct {
- *leveldb.Snapshot
- db *Lowlevel
- }
- func (s *snap) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
- return s.db.newIterator(func() iterator.Iterator { return s.Snapshot.NewIterator(slice, ro) })
- }
- // iter implements iterator.Iterator which allows tracking active iterators
- // and aborts if the underlying database is being closed.
- type iter struct {
- iterator.Iterator
- db *Lowlevel
- }
- func (it *iter) Release() {
- it.db.iterWG.Done()
- it.Iterator.Release()
- }
- func (it *iter) Next() bool {
- return it.execIfNotClosed(it.Iterator.Next)
- }
- func (it *iter) Prev() bool {
- return it.execIfNotClosed(it.Iterator.Prev)
- }
- func (it *iter) First() bool {
- return it.execIfNotClosed(it.Iterator.First)
- }
- func (it *iter) Last() bool {
- return it.execIfNotClosed(it.Iterator.Last)
- }
- func (it *iter) Seek(key []byte) bool {
- return it.execIfNotClosed(func() bool {
- return it.Iterator.Seek(key)
- })
- }
- func (it *iter) execIfNotClosed(fn func() bool) bool {
- it.db.closeMut.RLock()
- defer it.db.closeMut.RUnlock()
- if it.db.closed {
- return false
- }
- return fn()
- }
- func debugEnvValue(key string, def int) int {
- v, err := strconv.ParseInt(os.Getenv("STDEBUG_"+key), 10, 63)
- if err != nil {
- return def
- }
- return int(v)
- }
|