badger_backend.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. // Copyright (C) 2019 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package backend
  7. import (
  8. "bytes"
  9. "errors"
  10. "time"
  11. badger "github.com/dgraph-io/badger/v2"
  12. )
  13. const (
  14. checkpointFlushMinSize = 128 << KiB
  15. maxCacheSize = 64 << MiB
  16. )
  17. func OpenBadger(path string) (Backend, error) {
  18. opts := badger.DefaultOptions(path)
  19. opts = opts.WithMaxCacheSize(maxCacheSize).WithCompactL0OnClose(false)
  20. opts.Logger = nil
  21. return openBadger(opts)
  22. }
  23. func OpenBadgerMemory() Backend {
  24. opts := badger.DefaultOptions("").WithInMemory(true)
  25. opts.Logger = nil
  26. backend, err := openBadger(opts)
  27. if err != nil {
  28. // Opening in-memory should never be able to fail, and is anyway
  29. // used just by tests.
  30. panic(err)
  31. }
  32. return backend
  33. }
  34. func openBadger(opts badger.Options) (Backend, error) {
  35. // XXX: We should find good values for memory utilization in the "small"
  36. // and "large" cases we support for LevelDB. Some notes here:
  37. // https://github.com/dgraph-io/badger/tree/v2.0.3#memory-usage
  38. bdb, err := badger.Open(opts)
  39. if err != nil {
  40. return nil, wrapBadgerErr(err)
  41. }
  42. return &badgerBackend{
  43. bdb: bdb,
  44. closeWG: &closeWaitGroup{},
  45. }, nil
  46. }
  47. // badgerBackend implements Backend on top of a badger
  48. type badgerBackend struct {
  49. bdb *badger.DB
  50. closeWG *closeWaitGroup
  51. }
  52. func (b *badgerBackend) NewReadTransaction() (ReadTransaction, error) {
  53. rel, err := newReleaser(b.closeWG)
  54. if err != nil {
  55. return nil, err
  56. }
  57. return badgerSnapshot{
  58. txn: b.bdb.NewTransaction(false),
  59. rel: rel,
  60. }, nil
  61. }
  62. func (b *badgerBackend) NewWriteTransaction(hooks ...CommitHook) (WriteTransaction, error) {
  63. rel1, err := newReleaser(b.closeWG)
  64. if err != nil {
  65. return nil, err
  66. }
  67. rel2, err := newReleaser(b.closeWG)
  68. if err != nil {
  69. rel1.Release()
  70. return nil, err
  71. }
  72. // We use two transactions here to preserve the property that our
  73. // leveldb wrapper has, that writes in a transaction are completely
  74. // invisible until it's committed, even inside that same transaction.
  75. rtxn := b.bdb.NewTransaction(false)
  76. wtxn := b.bdb.NewTransaction(true)
  77. return &badgerTransaction{
  78. badgerSnapshot: badgerSnapshot{
  79. txn: rtxn,
  80. rel: rel1,
  81. },
  82. txn: wtxn,
  83. bdb: b.bdb,
  84. rel: rel2,
  85. commitHooks: hooks,
  86. }, nil
  87. }
  88. func (b *badgerBackend) Close() error {
  89. b.closeWG.CloseWait()
  90. return wrapBadgerErr(b.bdb.Close())
  91. }
  92. func (b *badgerBackend) Get(key []byte) ([]byte, error) {
  93. if err := b.closeWG.Add(1); err != nil {
  94. return nil, err
  95. }
  96. defer b.closeWG.Done()
  97. txn := b.bdb.NewTransaction(false)
  98. defer txn.Discard()
  99. item, err := txn.Get(key)
  100. if err != nil {
  101. return nil, wrapBadgerErr(err)
  102. }
  103. val, err := item.ValueCopy(nil)
  104. if err != nil {
  105. return nil, wrapBadgerErr(err)
  106. }
  107. return val, nil
  108. }
  109. func (b *badgerBackend) NewPrefixIterator(prefix []byte) (Iterator, error) {
  110. if err := b.closeWG.Add(1); err != nil {
  111. return nil, err
  112. }
  113. txn := b.bdb.NewTransaction(false)
  114. it := badgerPrefixIterator(txn, prefix)
  115. it.releaseFn = func() {
  116. defer b.closeWG.Done()
  117. txn.Discard()
  118. }
  119. return it, nil
  120. }
  121. func (b *badgerBackend) NewRangeIterator(first, last []byte) (Iterator, error) {
  122. if err := b.closeWG.Add(1); err != nil {
  123. return nil, err
  124. }
  125. txn := b.bdb.NewTransaction(false)
  126. it := badgerRangeIterator(txn, first, last)
  127. it.releaseFn = func() {
  128. defer b.closeWG.Done()
  129. txn.Discard()
  130. }
  131. return it, nil
  132. }
  133. func (b *badgerBackend) Put(key, val []byte) error {
  134. if err := b.closeWG.Add(1); err != nil {
  135. return err
  136. }
  137. defer b.closeWG.Done()
  138. txn := b.bdb.NewTransaction(true)
  139. if err := txn.Set(key, val); err != nil {
  140. txn.Discard()
  141. return wrapBadgerErr(err)
  142. }
  143. return wrapBadgerErr(txn.Commit())
  144. }
  145. func (b *badgerBackend) Delete(key []byte) error {
  146. if err := b.closeWG.Add(1); err != nil {
  147. return err
  148. }
  149. defer b.closeWG.Done()
  150. txn := b.bdb.NewTransaction(true)
  151. if err := txn.Delete(key); err != nil {
  152. txn.Discard()
  153. return wrapBadgerErr(err)
  154. }
  155. return wrapBadgerErr(txn.Commit())
  156. }
  157. func (b *badgerBackend) Compact() error {
  158. if err := b.closeWG.Add(1); err != nil {
  159. return err
  160. }
  161. defer b.closeWG.Done()
  162. // This weird looking loop is as recommended in the README
  163. // (https://github.com/dgraph-io/badger/tree/v2.0.3#garbage-collection).
  164. // Basically, the RunValueLogGC will pick some promising thing to
  165. // garbage collect at random and return nil if it improved the
  166. // situation, then return ErrNoRewrite when there is nothing more to GC.
  167. // The 0.5 is the discard ratio, for which the method docs say they
  168. // "recommend setting discardRatio to 0.5, thus indicating that a file
  169. // be rewritten if half the space can be discarded".
  170. var err error
  171. t0 := time.Now()
  172. for err == nil {
  173. if time.Since(t0) > time.Hour {
  174. l.Warnln("Database compaction is taking a long time, performance may be impacted. Consider investigating and/or opening an issue if this warning repeats.")
  175. t0 = time.Now()
  176. }
  177. err = b.bdb.RunValueLogGC(0.5)
  178. }
  179. if errors.Is(err, badger.ErrNoRewrite) {
  180. // GC did nothing, because nothing needed to be done
  181. return nil
  182. }
  183. if errors.Is(err, badger.ErrRejected) {
  184. // GC was already running (could possibly happen), or the database
  185. // is closed (can't happen).
  186. return nil
  187. }
  188. if errors.Is(err, badger.ErrGCInMemoryMode) {
  189. // GC in in-memory mode, which is fine.
  190. return nil
  191. }
  192. return err
  193. }
  194. // badgerSnapshot implements backend.ReadTransaction
  195. type badgerSnapshot struct {
  196. txn *badger.Txn
  197. rel *releaser
  198. }
  199. func (l badgerSnapshot) Get(key []byte) ([]byte, error) {
  200. item, err := l.txn.Get(key)
  201. if err != nil {
  202. return nil, wrapBadgerErr(err)
  203. }
  204. val, err := item.ValueCopy(nil)
  205. if err != nil {
  206. return nil, wrapBadgerErr(err)
  207. }
  208. return val, nil
  209. }
  210. func (l badgerSnapshot) NewPrefixIterator(prefix []byte) (Iterator, error) {
  211. return badgerPrefixIterator(l.txn, prefix), nil
  212. }
  213. func (l badgerSnapshot) NewRangeIterator(first, last []byte) (Iterator, error) {
  214. return badgerRangeIterator(l.txn, first, last), nil
  215. }
  216. func (l badgerSnapshot) Release() {
  217. defer l.rel.Release()
  218. l.txn.Discard()
  219. }
  220. type badgerTransaction struct {
  221. badgerSnapshot
  222. txn *badger.Txn
  223. bdb *badger.DB
  224. rel *releaser
  225. size int
  226. commitHooks []CommitHook
  227. }
  228. func (t *badgerTransaction) Delete(key []byte) error {
  229. t.size += len(key)
  230. kc := make([]byte, len(key))
  231. copy(kc, key)
  232. return t.transactionRetried(func(txn *badger.Txn) error {
  233. return txn.Delete(kc)
  234. })
  235. }
  236. func (t *badgerTransaction) Put(key, val []byte) error {
  237. t.size += len(key) + len(val)
  238. kc := make([]byte, len(key))
  239. copy(kc, key)
  240. vc := make([]byte, len(val))
  241. copy(vc, val)
  242. return t.transactionRetried(func(txn *badger.Txn) error {
  243. return txn.Set(kc, vc)
  244. })
  245. }
  246. // transactionRetried performs the given operation in the current
  247. // transaction, with commit and retry if Badger says the transaction has
  248. // grown too large.
  249. func (t *badgerTransaction) transactionRetried(fn func(*badger.Txn) error) error {
  250. if err := fn(t.txn); err == badger.ErrTxnTooBig {
  251. if err := t.txn.Commit(); err != nil {
  252. return wrapBadgerErr(err)
  253. }
  254. t.size = 0
  255. t.txn = t.bdb.NewTransaction(true)
  256. return wrapBadgerErr(fn(t.txn))
  257. } else if err != nil {
  258. return wrapBadgerErr(err)
  259. }
  260. return nil
  261. }
  262. func (t *badgerTransaction) Commit() error {
  263. defer t.rel.Release()
  264. defer t.badgerSnapshot.Release()
  265. for _, hook := range t.commitHooks {
  266. if err := hook(t); err != nil {
  267. return err
  268. }
  269. }
  270. return wrapBadgerErr(t.txn.Commit())
  271. }
  272. func (t *badgerTransaction) Checkpoint() error {
  273. if t.size < checkpointFlushMinSize {
  274. return nil
  275. }
  276. for _, hook := range t.commitHooks {
  277. if err := hook(t); err != nil {
  278. return err
  279. }
  280. }
  281. err := t.txn.Commit()
  282. if err == nil {
  283. t.size = 0
  284. t.txn = t.bdb.NewTransaction(true)
  285. }
  286. return wrapBadgerErr(err)
  287. }
  288. func (t *badgerTransaction) Release() {
  289. defer t.rel.Release()
  290. defer t.badgerSnapshot.Release()
  291. t.txn.Discard()
  292. }
  293. type badgerIterator struct {
  294. it *badger.Iterator
  295. prefix []byte
  296. first []byte
  297. last []byte
  298. releaseFn func()
  299. didSeek bool
  300. err error
  301. }
  302. func (i *badgerIterator) Next() bool {
  303. if i.err != nil {
  304. return false
  305. }
  306. for {
  307. if !i.didSeek {
  308. if i.first != nil {
  309. // Range iterator
  310. i.it.Seek(i.first)
  311. } else {
  312. // Prefix iterator
  313. i.it.Seek(i.prefix)
  314. }
  315. i.didSeek = true
  316. } else {
  317. i.it.Next()
  318. }
  319. if !i.it.ValidForPrefix(i.prefix) {
  320. // Done
  321. return false
  322. }
  323. if i.first == nil && i.last == nil {
  324. // No range checks required
  325. return true
  326. }
  327. key := i.it.Item().Key()
  328. if bytes.Compare(key, i.last) > 0 {
  329. // Key is after range last
  330. return false
  331. }
  332. return true
  333. }
  334. }
  335. func (i *badgerIterator) Key() []byte {
  336. if i.err != nil {
  337. return nil
  338. }
  339. return i.it.Item().Key()
  340. }
  341. func (i *badgerIterator) Value() []byte {
  342. if i.err != nil {
  343. return nil
  344. }
  345. val, err := i.it.Item().ValueCopy(nil)
  346. if err != nil {
  347. i.err = err
  348. }
  349. return val
  350. }
  351. func (i *badgerIterator) Error() error {
  352. return wrapBadgerErr(i.err)
  353. }
  354. func (i *badgerIterator) Release() {
  355. i.it.Close()
  356. if i.releaseFn != nil {
  357. i.releaseFn()
  358. }
  359. }
  360. // wrapBadgerErr wraps errors so that the backend package can recognize them
  361. func wrapBadgerErr(err error) error {
  362. if err == nil {
  363. return nil
  364. }
  365. if err == badger.ErrDiscardedTxn {
  366. return &errClosed{}
  367. }
  368. if err == badger.ErrKeyNotFound {
  369. return &errNotFound{}
  370. }
  371. return err
  372. }
  373. func badgerPrefixIterator(txn *badger.Txn, prefix []byte) *badgerIterator {
  374. it := iteratorForPrefix(txn, prefix)
  375. return &badgerIterator{it: it, prefix: prefix}
  376. }
  377. func badgerRangeIterator(txn *badger.Txn, first, last []byte) *badgerIterator {
  378. prefix := commonPrefix(first, last)
  379. it := iteratorForPrefix(txn, prefix)
  380. return &badgerIterator{it: it, prefix: prefix, first: first, last: last}
  381. }
  382. func iteratorForPrefix(txn *badger.Txn, prefix []byte) *badger.Iterator {
  383. opts := badger.DefaultIteratorOptions
  384. opts.Prefix = prefix
  385. return txn.NewIterator(opts)
  386. }
  387. func commonPrefix(a, b []byte) []byte {
  388. minLen := len(a)
  389. if len(b) < minLen {
  390. minLen = len(b)
  391. }
  392. prefix := make([]byte, 0, minLen)
  393. for i := 0; i < minLen; i++ {
  394. if a[i] != b[i] {
  395. break
  396. }
  397. prefix = append(prefix, a[i])
  398. }
  399. return prefix
  400. }