lowlevel.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "github.com/syndtr/goleveldb/leveldb"
  14. "github.com/syndtr/goleveldb/leveldb/errors"
  15. "github.com/syndtr/goleveldb/leveldb/iterator"
  16. "github.com/syndtr/goleveldb/leveldb/opt"
  17. "github.com/syndtr/goleveldb/leveldb/storage"
  18. "github.com/syndtr/goleveldb/leveldb/util"
  19. )
  20. const (
  21. dbMaxOpenFiles = 100
  22. dbFlushBatch = 4 << MiB
  23. // A large database is > 200 MiB. It's a mostly arbitrary value, but
  24. // it's also the case that each file is 2 MiB by default and when we
  25. // have dbMaxOpenFiles of them we will need to start thrashing fd:s.
  26. // Switching to large database settings causes larger files to be used
  27. // when compacting, reducing the number.
  28. dbLargeThreshold = dbMaxOpenFiles * (2 << MiB)
  29. KiB = 10
  30. MiB = 20
  31. )
  32. type Tuning int
  33. const (
  34. // N.b. these constants must match those in lib/config.Tuning!
  35. TuningAuto Tuning = iota
  36. TuningSmall
  37. TuningLarge
  38. )
  39. // Lowlevel is the lowest level database interface. It has a very simple
  40. // purpose: hold the actual *leveldb.DB database, and the in-memory state
  41. // that belong to that database. In the same way that a single on disk
  42. // database can only be opened once, there should be only one Lowlevel for
  43. // any given *leveldb.DB.
  44. type Lowlevel struct {
  45. committed int64 // atomic, must come first
  46. *leveldb.DB
  47. location string
  48. folderIdx *smallIndex
  49. deviceIdx *smallIndex
  50. closed bool
  51. closeMut *sync.RWMutex
  52. iterWG sync.WaitGroup
  53. }
  54. // Open attempts to open the database at the given location, and runs
  55. // recovery on it if opening fails. Worst case, if recovery is not possible,
  56. // the database is erased and created from scratch.
  57. func Open(location string, tuning Tuning) (*Lowlevel, error) {
  58. opts := optsFor(location, tuning)
  59. return open(location, opts)
  60. }
  61. // optsFor returns the database options to use when opening a database with
  62. // the given location and tuning. Settings can be overridden by debug
  63. // environment variables.
  64. func optsFor(location string, tuning Tuning) *opt.Options {
  65. large := false
  66. switch tuning {
  67. case TuningLarge:
  68. large = true
  69. case TuningAuto:
  70. large = dbIsLarge(location)
  71. }
  72. var (
  73. // Set defaults used for small databases.
  74. defaultBlockCacheCapacity = 0 // 0 means let leveldb use default
  75. defaultBlockSize = 0
  76. defaultCompactionTableSize = 0
  77. defaultCompactionTableSizeMultiplier = 0
  78. defaultWriteBuffer = 16 << MiB // increased from leveldb default of 4 MiB
  79. defaultCompactionL0Trigger = opt.DefaultCompactionL0Trigger // explicit because we use it as base for other stuff
  80. )
  81. if large {
  82. // Change the parameters for better throughput at the price of some
  83. // RAM and larger files. This results in larger batches of writes
  84. // and compaction at a lower frequency.
  85. l.Infoln("Using large-database tuning")
  86. defaultBlockCacheCapacity = 64 << MiB
  87. defaultBlockSize = 64 << KiB
  88. defaultCompactionTableSize = 16 << MiB
  89. defaultCompactionTableSizeMultiplier = 20 // 2.0 after division by ten
  90. defaultWriteBuffer = 64 << MiB
  91. defaultCompactionL0Trigger = 8 // number of l0 files
  92. }
  93. opts := &opt.Options{
  94. BlockCacheCapacity: debugEnvValue("BlockCacheCapacity", defaultBlockCacheCapacity),
  95. BlockCacheEvictRemoved: debugEnvValue("BlockCacheEvictRemoved", 0) != 0,
  96. BlockRestartInterval: debugEnvValue("BlockRestartInterval", 0),
  97. BlockSize: debugEnvValue("BlockSize", defaultBlockSize),
  98. CompactionExpandLimitFactor: debugEnvValue("CompactionExpandLimitFactor", 0),
  99. CompactionGPOverlapsFactor: debugEnvValue("CompactionGPOverlapsFactor", 0),
  100. CompactionL0Trigger: debugEnvValue("CompactionL0Trigger", defaultCompactionL0Trigger),
  101. CompactionSourceLimitFactor: debugEnvValue("CompactionSourceLimitFactor", 0),
  102. CompactionTableSize: debugEnvValue("CompactionTableSize", defaultCompactionTableSize),
  103. CompactionTableSizeMultiplier: float64(debugEnvValue("CompactionTableSizeMultiplier", defaultCompactionTableSizeMultiplier)) / 10.0,
  104. CompactionTotalSize: debugEnvValue("CompactionTotalSize", 0),
  105. CompactionTotalSizeMultiplier: float64(debugEnvValue("CompactionTotalSizeMultiplier", 0)) / 10.0,
  106. DisableBufferPool: debugEnvValue("DisableBufferPool", 0) != 0,
  107. DisableBlockCache: debugEnvValue("DisableBlockCache", 0) != 0,
  108. DisableCompactionBackoff: debugEnvValue("DisableCompactionBackoff", 0) != 0,
  109. DisableLargeBatchTransaction: debugEnvValue("DisableLargeBatchTransaction", 0) != 0,
  110. NoSync: debugEnvValue("NoSync", 0) != 0,
  111. NoWriteMerge: debugEnvValue("NoWriteMerge", 0) != 0,
  112. OpenFilesCacheCapacity: debugEnvValue("OpenFilesCacheCapacity", dbMaxOpenFiles),
  113. WriteBuffer: debugEnvValue("WriteBuffer", defaultWriteBuffer),
  114. // The write slowdown and pause can be overridden, but even if they
  115. // are not and the compaction trigger is overridden we need to
  116. // adjust so that we don't pause writes for L0 compaction before we
  117. // even *start* L0 compaction...
  118. WriteL0SlowdownTrigger: debugEnvValue("WriteL0SlowdownTrigger", 2*debugEnvValue("CompactionL0Trigger", defaultCompactionL0Trigger)),
  119. WriteL0PauseTrigger: debugEnvValue("WriteL0SlowdownTrigger", 3*debugEnvValue("CompactionL0Trigger", defaultCompactionL0Trigger)),
  120. }
  121. return opts
  122. }
  123. // OpenRO attempts to open the database at the given location, read only.
  124. func OpenRO(location string) (*Lowlevel, error) {
  125. opts := &opt.Options{
  126. OpenFilesCacheCapacity: dbMaxOpenFiles,
  127. ReadOnly: true,
  128. }
  129. return open(location, opts)
  130. }
  131. func open(location string, opts *opt.Options) (*Lowlevel, error) {
  132. db, err := leveldb.OpenFile(location, opts)
  133. if leveldbIsCorrupted(err) {
  134. db, err = leveldb.RecoverFile(location, opts)
  135. }
  136. if leveldbIsCorrupted(err) {
  137. // The database is corrupted, and we've tried to recover it but it
  138. // didn't work. At this point there isn't much to do beyond dropping
  139. // the database and reindexing...
  140. l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
  141. if err := os.RemoveAll(location); err != nil {
  142. return nil, errorSuggestion{err, "failed to delete corrupted database"}
  143. }
  144. db, err = leveldb.OpenFile(location, opts)
  145. }
  146. if err != nil {
  147. return nil, errorSuggestion{err, "is another instance of Syncthing running?"}
  148. }
  149. if debugEnvValue("CompactEverything", 0) != 0 {
  150. if err := db.CompactRange(util.Range{}); err != nil {
  151. l.Warnln("Compacting database:", err)
  152. }
  153. }
  154. return NewLowlevel(db, location), nil
  155. }
  156. // OpenMemory returns a new Lowlevel referencing an in-memory database.
  157. func OpenMemory() *Lowlevel {
  158. db, _ := leveldb.Open(storage.NewMemStorage(), nil)
  159. return NewLowlevel(db, "<memory>")
  160. }
  161. // ListFolders returns the list of folders currently in the database
  162. func (db *Lowlevel) ListFolders() []string {
  163. return db.folderIdx.Values()
  164. }
  165. // Committed returns the number of items committed to the database since startup
  166. func (db *Lowlevel) Committed() int64 {
  167. return atomic.LoadInt64(&db.committed)
  168. }
  169. func (db *Lowlevel) Put(key, val []byte, wo *opt.WriteOptions) error {
  170. db.closeMut.RLock()
  171. defer db.closeMut.RUnlock()
  172. if db.closed {
  173. return leveldb.ErrClosed
  174. }
  175. atomic.AddInt64(&db.committed, 1)
  176. return db.DB.Put(key, val, wo)
  177. }
  178. func (db *Lowlevel) Write(batch *leveldb.Batch, wo *opt.WriteOptions) error {
  179. db.closeMut.RLock()
  180. defer db.closeMut.RUnlock()
  181. if db.closed {
  182. return leveldb.ErrClosed
  183. }
  184. return db.DB.Write(batch, wo)
  185. }
  186. func (db *Lowlevel) Delete(key []byte, wo *opt.WriteOptions) error {
  187. db.closeMut.RLock()
  188. defer db.closeMut.RUnlock()
  189. if db.closed {
  190. return leveldb.ErrClosed
  191. }
  192. atomic.AddInt64(&db.committed, 1)
  193. return db.DB.Delete(key, wo)
  194. }
  195. func (db *Lowlevel) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
  196. return db.newIterator(func() iterator.Iterator { return db.DB.NewIterator(slice, ro) })
  197. }
  198. // newIterator returns an iterator created with the given constructor only if db
  199. // is not yet closed. If it is closed, a closedIter is returned instead.
  200. func (db *Lowlevel) newIterator(constr func() iterator.Iterator) iterator.Iterator {
  201. db.closeMut.RLock()
  202. defer db.closeMut.RUnlock()
  203. if db.closed {
  204. return &closedIter{}
  205. }
  206. db.iterWG.Add(1)
  207. return &iter{
  208. Iterator: constr(),
  209. db: db,
  210. }
  211. }
  212. func (db *Lowlevel) GetSnapshot() snapshot {
  213. s, err := db.DB.GetSnapshot()
  214. if err != nil {
  215. if err == leveldb.ErrClosed {
  216. return &closedSnap{}
  217. }
  218. panic(err)
  219. }
  220. return &snap{
  221. Snapshot: s,
  222. db: db,
  223. }
  224. }
  225. func (db *Lowlevel) Close() {
  226. db.closeMut.Lock()
  227. if db.closed {
  228. db.closeMut.Unlock()
  229. return
  230. }
  231. db.closed = true
  232. db.closeMut.Unlock()
  233. db.iterWG.Wait()
  234. db.DB.Close()
  235. }
  236. // dbIsLarge returns whether the estimated size of the database at location
  237. // is large enough to warrant optimization for large databases.
  238. func dbIsLarge(location string) bool {
  239. if ^uint(0)>>63 == 0 {
  240. // We're compiled for a 32 bit architecture. We've seen trouble with
  241. // large settings there.
  242. // (https://forum.syncthing.net/t/many-small-ldb-files-with-database-tuning/13842)
  243. return false
  244. }
  245. dir, err := os.Open(location)
  246. if err != nil {
  247. return false
  248. }
  249. fis, err := dir.Readdir(-1)
  250. if err != nil {
  251. return false
  252. }
  253. var size int64
  254. for _, fi := range fis {
  255. if fi.Name() == "LOG" {
  256. // don't count the size
  257. continue
  258. }
  259. size += fi.Size()
  260. }
  261. return size > dbLargeThreshold
  262. }
  263. // NewLowlevel wraps the given *leveldb.DB into a *lowlevel
  264. func NewLowlevel(db *leveldb.DB, location string) *Lowlevel {
  265. return &Lowlevel{
  266. DB: db,
  267. location: location,
  268. folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}),
  269. deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}),
  270. closeMut: &sync.RWMutex{},
  271. iterWG: sync.WaitGroup{},
  272. }
  273. }
  274. // A "better" version of leveldb's errors.IsCorrupted.
  275. func leveldbIsCorrupted(err error) bool {
  276. switch {
  277. case err == nil:
  278. return false
  279. case errors.IsCorrupted(err):
  280. return true
  281. case strings.Contains(err.Error(), "corrupted"):
  282. return true
  283. }
  284. return false
  285. }
  286. type batch struct {
  287. *leveldb.Batch
  288. db *Lowlevel
  289. }
  290. func (db *Lowlevel) newBatch() *batch {
  291. return &batch{
  292. Batch: new(leveldb.Batch),
  293. db: db,
  294. }
  295. }
  296. // checkFlush flushes and resets the batch if its size exceeds dbFlushBatch.
  297. func (b *batch) checkFlush() {
  298. if len(b.Dump()) > dbFlushBatch {
  299. b.flush()
  300. b.Reset()
  301. }
  302. }
  303. func (b *batch) flush() {
  304. if err := b.db.Write(b.Batch, nil); err != nil && err != leveldb.ErrClosed {
  305. panic(err)
  306. }
  307. }
  308. type closedIter struct{}
  309. func (it *closedIter) Release() {}
  310. func (it *closedIter) Key() []byte { return nil }
  311. func (it *closedIter) Value() []byte { return nil }
  312. func (it *closedIter) Next() bool { return false }
  313. func (it *closedIter) Prev() bool { return false }
  314. func (it *closedIter) First() bool { return false }
  315. func (it *closedIter) Last() bool { return false }
  316. func (it *closedIter) Seek(key []byte) bool { return false }
  317. func (it *closedIter) Valid() bool { return false }
  318. func (it *closedIter) Error() error { return leveldb.ErrClosed }
  319. func (it *closedIter) SetReleaser(releaser util.Releaser) {}
  320. type snapshot interface {
  321. Get([]byte, *opt.ReadOptions) ([]byte, error)
  322. Has([]byte, *opt.ReadOptions) (bool, error)
  323. NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator
  324. Release()
  325. }
  326. type closedSnap struct{}
  327. func (s *closedSnap) Get([]byte, *opt.ReadOptions) ([]byte, error) { return nil, leveldb.ErrClosed }
  328. func (s *closedSnap) Has([]byte, *opt.ReadOptions) (bool, error) { return false, leveldb.ErrClosed }
  329. func (s *closedSnap) NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator {
  330. return &closedIter{}
  331. }
  332. func (s *closedSnap) Release() {}
  333. type snap struct {
  334. *leveldb.Snapshot
  335. db *Lowlevel
  336. }
  337. func (s *snap) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
  338. return s.db.newIterator(func() iterator.Iterator { return s.Snapshot.NewIterator(slice, ro) })
  339. }
  340. // iter implements iterator.Iterator which allows tracking active iterators
  341. // and aborts if the underlying database is being closed.
  342. type iter struct {
  343. iterator.Iterator
  344. db *Lowlevel
  345. }
  346. func (it *iter) Release() {
  347. it.db.iterWG.Done()
  348. it.Iterator.Release()
  349. }
  350. func (it *iter) Next() bool {
  351. return it.execIfNotClosed(it.Iterator.Next)
  352. }
  353. func (it *iter) Prev() bool {
  354. return it.execIfNotClosed(it.Iterator.Prev)
  355. }
  356. func (it *iter) First() bool {
  357. return it.execIfNotClosed(it.Iterator.First)
  358. }
  359. func (it *iter) Last() bool {
  360. return it.execIfNotClosed(it.Iterator.Last)
  361. }
  362. func (it *iter) Seek(key []byte) bool {
  363. return it.execIfNotClosed(func() bool {
  364. return it.Iterator.Seek(key)
  365. })
  366. }
  367. func (it *iter) execIfNotClosed(fn func() bool) bool {
  368. it.db.closeMut.RLock()
  369. defer it.db.closeMut.RUnlock()
  370. if it.db.closed {
  371. return false
  372. }
  373. return fn()
  374. }
  375. func debugEnvValue(key string, def int) int {
  376. v, err := strconv.ParseInt(os.Getenv("STDEBUG_"+key), 10, 63)
  377. if err != nil {
  378. return def
  379. }
  380. return int(v)
  381. }