badger_backend.go 10 KB


  1. // Copyright (C) 2019 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package backend
  7. import (
  8. "bytes"
  9. "errors"
  10. "time"
  11. badger "github.com/dgraph-io/badger/v2"
  12. )
  13. const (
  14. checkpointFlushMinSize = 128 << KiB
  15. maxCacheSize = 64 << MiB
  16. )
  17. func OpenBadger(path string) (Backend, error) {
  18. opts := badger.DefaultOptions(path)
  19. opts = opts.WithMaxCacheSize(maxCacheSize).WithCompactL0OnClose(false)
  20. opts.Logger = nil
  21. backend, err := openBadger(opts)
  22. if err != nil {
  23. return nil, err
  24. }
  25. backend.location = path
  26. return backend, nil
  27. }
  28. func OpenBadgerMemory() Backend {
  29. opts := badger.DefaultOptions("").WithInMemory(true)
  30. opts.Logger = nil
  31. backend, err := openBadger(opts)
  32. if err != nil {
  33. // Opening in-memory should never be able to fail, and is anyway
  34. // used just by tests.
  35. panic(err)
  36. }
  37. return backend
  38. }
  39. func openBadger(opts badger.Options) (*badgerBackend, error) {
  40. // XXX: We should find good values for memory utilization in the "small"
  41. // and "large" cases we support for LevelDB. Some notes here:
  42. // https://github.com/dgraph-io/badger/tree/v2.0.3#memory-usage
  43. bdb, err := badger.Open(opts)
  44. if err != nil {
  45. return nil, wrapBadgerErr(err)
  46. }
  47. return &badgerBackend{
  48. bdb: bdb,
  49. closeWG: &closeWaitGroup{},
  50. }, nil
  51. }
  52. // badgerBackend implements Backend on top of a badger
  53. type badgerBackend struct {
  54. bdb *badger.DB
  55. closeWG *closeWaitGroup
  56. location string
  57. }
  58. func (b *badgerBackend) NewReadTransaction() (ReadTransaction, error) {
  59. rel, err := newReleaser(b.closeWG)
  60. if err != nil {
  61. return nil, err
  62. }
  63. return badgerSnapshot{
  64. txn: b.bdb.NewTransaction(false),
  65. rel: rel,
  66. }, nil
  67. }
  68. func (b *badgerBackend) NewWriteTransaction(hooks ...CommitHook) (WriteTransaction, error) {
  69. rel1, err := newReleaser(b.closeWG)
  70. if err != nil {
  71. return nil, err
  72. }
  73. rel2, err := newReleaser(b.closeWG)
  74. if err != nil {
  75. rel1.Release()
  76. return nil, err
  77. }
  78. // We use two transactions here to preserve the property that our
  79. // leveldb wrapper has, that writes in a transaction are completely
  80. // invisible until it's committed, even inside that same transaction.
  81. rtxn := b.bdb.NewTransaction(false)
  82. wtxn := b.bdb.NewTransaction(true)
  83. return &badgerTransaction{
  84. badgerSnapshot: badgerSnapshot{
  85. txn: rtxn,
  86. rel: rel1,
  87. },
  88. txn: wtxn,
  89. bdb: b.bdb,
  90. rel: rel2,
  91. commitHooks: hooks,
  92. }, nil
  93. }
  94. func (b *badgerBackend) Close() error {
  95. b.closeWG.CloseWait()
  96. return wrapBadgerErr(b.bdb.Close())
  97. }
  98. func (b *badgerBackend) Get(key []byte) ([]byte, error) {
  99. if err := b.closeWG.Add(1); err != nil {
  100. return nil, err
  101. }
  102. defer b.closeWG.Done()
  103. txn := b.bdb.NewTransaction(false)
  104. defer txn.Discard()
  105. item, err := txn.Get(key)
  106. if err != nil {
  107. return nil, wrapBadgerErr(err)
  108. }
  109. val, err := item.ValueCopy(nil)
  110. if err != nil {
  111. return nil, wrapBadgerErr(err)
  112. }
  113. return val, nil
  114. }
  115. func (b *badgerBackend) NewPrefixIterator(prefix []byte) (Iterator, error) {
  116. if err := b.closeWG.Add(1); err != nil {
  117. return nil, err
  118. }
  119. txn := b.bdb.NewTransaction(false)
  120. it := badgerPrefixIterator(txn, prefix)
  121. it.releaseFn = func() {
  122. defer b.closeWG.Done()
  123. txn.Discard()
  124. }
  125. return it, nil
  126. }
  127. func (b *badgerBackend) NewRangeIterator(first, last []byte) (Iterator, error) {
  128. if err := b.closeWG.Add(1); err != nil {
  129. return nil, err
  130. }
  131. txn := b.bdb.NewTransaction(false)
  132. it := badgerRangeIterator(txn, first, last)
  133. it.releaseFn = func() {
  134. defer b.closeWG.Done()
  135. txn.Discard()
  136. }
  137. return it, nil
  138. }
  139. func (b *badgerBackend) Put(key, val []byte) error {
  140. if err := b.closeWG.Add(1); err != nil {
  141. return err
  142. }
  143. defer b.closeWG.Done()
  144. txn := b.bdb.NewTransaction(true)
  145. if err := txn.Set(key, val); err != nil {
  146. txn.Discard()
  147. return wrapBadgerErr(err)
  148. }
  149. return wrapBadgerErr(txn.Commit())
  150. }
  151. func (b *badgerBackend) Delete(key []byte) error {
  152. if err := b.closeWG.Add(1); err != nil {
  153. return err
  154. }
  155. defer b.closeWG.Done()
  156. txn := b.bdb.NewTransaction(true)
  157. if err := txn.Delete(key); err != nil {
  158. txn.Discard()
  159. return wrapBadgerErr(err)
  160. }
  161. return wrapBadgerErr(txn.Commit())
  162. }
  163. func (b *badgerBackend) Compact() error {
  164. if err := b.closeWG.Add(1); err != nil {
  165. return err
  166. }
  167. defer b.closeWG.Done()
  168. // This weird looking loop is as recommended in the README
  169. // (https://github.com/dgraph-io/badger/tree/v2.0.3#garbage-collection).
  170. // Basically, the RunValueLogGC will pick some promising thing to
  171. // garbage collect at random and return nil if it improved the
  172. // situation, then return ErrNoRewrite when there is nothing more to GC.
  173. // The 0.5 is the discard ratio, for which the method docs say they
  174. // "recommend setting discardRatio to 0.5, thus indicating that a file
  175. // be rewritten if half the space can be discarded".
  176. var err error
  177. t0 := time.Now()
  178. for err == nil {
  179. if time.Since(t0) > time.Hour {
  180. l.Warnln("Database compaction is taking a long time, performance may be impacted. Consider investigating and/or opening an issue if this warning repeats.")
  181. t0 = time.Now()
  182. }
  183. err = b.bdb.RunValueLogGC(0.5)
  184. }
  185. if errors.Is(err, badger.ErrNoRewrite) {
  186. // GC did nothing, because nothing needed to be done
  187. return nil
  188. }
  189. if errors.Is(err, badger.ErrRejected) {
  190. // GC was already running (could possibly happen), or the database
  191. // is closed (can't happen).
  192. return nil
  193. }
  194. if errors.Is(err, badger.ErrGCInMemoryMode) {
  195. // GC in in-memory mode, which is fine.
  196. return nil
  197. }
  198. return err
  199. }
  200. func (b *badgerBackend) Location() string {
  201. return b.location
  202. }
  203. // badgerSnapshot implements backend.ReadTransaction
  204. type badgerSnapshot struct {
  205. txn *badger.Txn
  206. rel *releaser
  207. }
  208. func (l badgerSnapshot) Get(key []byte) ([]byte, error) {
  209. item, err := l.txn.Get(key)
  210. if err != nil {
  211. return nil, wrapBadgerErr(err)
  212. }
  213. val, err := item.ValueCopy(nil)
  214. if err != nil {
  215. return nil, wrapBadgerErr(err)
  216. }
  217. return val, nil
  218. }
  219. func (l badgerSnapshot) NewPrefixIterator(prefix []byte) (Iterator, error) {
  220. return badgerPrefixIterator(l.txn, prefix), nil
  221. }
  222. func (l badgerSnapshot) NewRangeIterator(first, last []byte) (Iterator, error) {
  223. return badgerRangeIterator(l.txn, first, last), nil
  224. }
  225. func (l badgerSnapshot) Release() {
  226. defer l.rel.Release()
  227. l.txn.Discard()
  228. }
  229. type badgerTransaction struct {
  230. badgerSnapshot
  231. txn *badger.Txn
  232. bdb *badger.DB
  233. rel *releaser
  234. size int
  235. commitHooks []CommitHook
  236. }
  237. func (t *badgerTransaction) Delete(key []byte) error {
  238. t.size += len(key)
  239. kc := make([]byte, len(key))
  240. copy(kc, key)
  241. return t.transactionRetried(func(txn *badger.Txn) error {
  242. return txn.Delete(kc)
  243. })
  244. }
  245. func (t *badgerTransaction) Put(key, val []byte) error {
  246. t.size += len(key) + len(val)
  247. kc := make([]byte, len(key))
  248. copy(kc, key)
  249. vc := make([]byte, len(val))
  250. copy(vc, val)
  251. return t.transactionRetried(func(txn *badger.Txn) error {
  252. return txn.Set(kc, vc)
  253. })
  254. }
  255. // transactionRetried performs the given operation in the current
  256. // transaction, with commit and retry if Badger says the transaction has
  257. // grown too large.
  258. func (t *badgerTransaction) transactionRetried(fn func(*badger.Txn) error) error {
  259. if err := fn(t.txn); err == badger.ErrTxnTooBig {
  260. if err := t.txn.Commit(); err != nil {
  261. return wrapBadgerErr(err)
  262. }
  263. t.size = 0
  264. t.txn = t.bdb.NewTransaction(true)
  265. return wrapBadgerErr(fn(t.txn))
  266. } else if err != nil {
  267. return wrapBadgerErr(err)
  268. }
  269. return nil
  270. }
  271. func (t *badgerTransaction) Commit() error {
  272. defer t.rel.Release()
  273. defer t.badgerSnapshot.Release()
  274. for _, hook := range t.commitHooks {
  275. if err := hook(t); err != nil {
  276. return err
  277. }
  278. }
  279. return wrapBadgerErr(t.txn.Commit())
  280. }
  281. func (t *badgerTransaction) Checkpoint() error {
  282. if t.size < checkpointFlushMinSize {
  283. return nil
  284. }
  285. for _, hook := range t.commitHooks {
  286. if err := hook(t); err != nil {
  287. return err
  288. }
  289. }
  290. err := t.txn.Commit()
  291. if err == nil {
  292. t.size = 0
  293. t.txn = t.bdb.NewTransaction(true)
  294. }
  295. return wrapBadgerErr(err)
  296. }
  297. func (t *badgerTransaction) Release() {
  298. defer t.rel.Release()
  299. defer t.badgerSnapshot.Release()
  300. t.txn.Discard()
  301. }
  302. type badgerIterator struct {
  303. it *badger.Iterator
  304. prefix []byte
  305. first []byte
  306. last []byte
  307. releaseFn func()
  308. released bool
  309. didSeek bool
  310. err error
  311. }
  312. func (i *badgerIterator) Next() bool {
  313. if i.err != nil {
  314. return false
  315. }
  316. for {
  317. if !i.didSeek {
  318. if i.first != nil {
  319. // Range iterator
  320. i.it.Seek(i.first)
  321. } else {
  322. // Prefix iterator
  323. i.it.Seek(i.prefix)
  324. }
  325. i.didSeek = true
  326. } else {
  327. i.it.Next()
  328. }
  329. if !i.it.ValidForPrefix(i.prefix) {
  330. // Done
  331. return false
  332. }
  333. if i.first == nil && i.last == nil {
  334. // No range checks required
  335. return true
  336. }
  337. key := i.it.Item().Key()
  338. if bytes.Compare(key, i.last) > 0 {
  339. // Key is after range last
  340. return false
  341. }
  342. return true
  343. }
  344. }
  345. func (i *badgerIterator) Key() []byte {
  346. if i.err != nil {
  347. return nil
  348. }
  349. return i.it.Item().Key()
  350. }
  351. func (i *badgerIterator) Value() []byte {
  352. if i.err != nil {
  353. return nil
  354. }
  355. val, err := i.it.Item().ValueCopy(nil)
  356. if err != nil {
  357. i.err = err
  358. }
  359. return val
  360. }
  361. func (i *badgerIterator) Error() error {
  362. return wrapBadgerErr(i.err)
  363. }
  364. func (i *badgerIterator) Release() {
  365. if i.released {
  366. // We already closed this iterator, no need to do it again
  367. // (and the releaseFn might hang if we do).
  368. return
  369. }
  370. i.released = true
  371. i.it.Close()
  372. if i.releaseFn != nil {
  373. i.releaseFn()
  374. }
  375. }
  376. // wrapBadgerErr wraps errors so that the backend package can recognize them
  377. func wrapBadgerErr(err error) error {
  378. if err == nil {
  379. return nil
  380. }
  381. if err == badger.ErrDiscardedTxn {
  382. return &errClosed{}
  383. }
  384. if err == badger.ErrKeyNotFound {
  385. return &errNotFound{}
  386. }
  387. return err
  388. }
  389. func badgerPrefixIterator(txn *badger.Txn, prefix []byte) *badgerIterator {
  390. it := iteratorForPrefix(txn, prefix)
  391. return &badgerIterator{it: it, prefix: prefix}
  392. }
  393. func badgerRangeIterator(txn *badger.Txn, first, last []byte) *badgerIterator {
  394. prefix := commonPrefix(first, last)
  395. it := iteratorForPrefix(txn, prefix)
  396. return &badgerIterator{it: it, prefix: prefix, first: first, last: last}
  397. }
  398. func iteratorForPrefix(txn *badger.Txn, prefix []byte) *badger.Iterator {
  399. opts := badger.DefaultIteratorOptions
  400. opts.Prefix = prefix
  401. return txn.NewIterator(opts)
  402. }
  403. func commonPrefix(a, b []byte) []byte {
  404. minLen := len(a)
  405. if len(b) < minLen {
  406. minLen = len(b)
  407. }
  408. prefix := make([]byte, 0, minLen)
  409. for i := 0; i < minLen; i++ {
  410. if a[i] != b[i] {
  411. break
  412. }
  413. prefix = append(prefix, a[i])
  414. }
  415. return prefix
  416. }