badger_backend.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. // Copyright (C) 2019 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package backend
  7. import (
  8. "bytes"
  9. "errors"
  10. "time"
  11. badger "github.com/dgraph-io/badger/v2"
  12. )
  13. const (
  14. checkpointFlushMinSize = 128 << KiB
  15. maxCacheSize = 64 << MiB
  16. )
  17. func OpenBadger(path string) (Backend, error) {
  18. opts := badger.DefaultOptions(path)
  19. opts = opts.WithMaxCacheSize(maxCacheSize).WithCompactL0OnClose(false)
  20. opts.Logger = nil
  21. return openBadger(opts)
  22. }
  23. func OpenBadgerMemory() Backend {
  24. opts := badger.DefaultOptions("").WithInMemory(true)
  25. opts.Logger = nil
  26. backend, err := openBadger(opts)
  27. if err != nil {
  28. // Opening in-memory should never be able to fail, and is anyway
  29. // used just by tests.
  30. panic(err)
  31. }
  32. return backend
  33. }
  34. func openBadger(opts badger.Options) (Backend, error) {
  35. // XXX: We should find good values for memory utilization in the "small"
  36. // and "large" cases we support for LevelDB. Some notes here:
  37. // https://github.com/dgraph-io/badger/tree/v2.0.3#memory-usage
  38. bdb, err := badger.Open(opts)
  39. if err != nil {
  40. return nil, wrapBadgerErr(err)
  41. }
  42. return &badgerBackend{
  43. bdb: bdb,
  44. closeWG: &closeWaitGroup{},
  45. }, nil
  46. }
  47. // badgerBackend implements Backend on top of a badger
  48. type badgerBackend struct {
  49. bdb *badger.DB
  50. closeWG *closeWaitGroup
  51. }
  52. func (b *badgerBackend) NewReadTransaction() (ReadTransaction, error) {
  53. rel, err := newReleaser(b.closeWG)
  54. if err != nil {
  55. return nil, err
  56. }
  57. return badgerSnapshot{
  58. txn: b.bdb.NewTransaction(false),
  59. rel: rel,
  60. }, nil
  61. }
  62. func (b *badgerBackend) NewWriteTransaction() (WriteTransaction, error) {
  63. rel1, err := newReleaser(b.closeWG)
  64. if err != nil {
  65. return nil, err
  66. }
  67. rel2, err := newReleaser(b.closeWG)
  68. if err != nil {
  69. rel1.Release()
  70. return nil, err
  71. }
  72. // We use two transactions here to preserve the property that our
  73. // leveldb wrapper has, that writes in a transaction are completely
  74. // invisible until it's committed, even inside that same transaction.
  75. rtxn := b.bdb.NewTransaction(false)
  76. wtxn := b.bdb.NewTransaction(true)
  77. return &badgerTransaction{
  78. badgerSnapshot: badgerSnapshot{
  79. txn: rtxn,
  80. rel: rel1,
  81. },
  82. txn: wtxn,
  83. bdb: b.bdb,
  84. rel: rel2,
  85. }, nil
  86. }
  87. func (b *badgerBackend) Close() error {
  88. b.closeWG.CloseWait()
  89. return wrapBadgerErr(b.bdb.Close())
  90. }
  91. func (b *badgerBackend) Get(key []byte) ([]byte, error) {
  92. if err := b.closeWG.Add(1); err != nil {
  93. return nil, err
  94. }
  95. defer b.closeWG.Done()
  96. txn := b.bdb.NewTransaction(false)
  97. defer txn.Discard()
  98. item, err := txn.Get(key)
  99. if err != nil {
  100. return nil, wrapBadgerErr(err)
  101. }
  102. val, err := item.ValueCopy(nil)
  103. if err != nil {
  104. return nil, wrapBadgerErr(err)
  105. }
  106. return val, nil
  107. }
  108. func (b *badgerBackend) NewPrefixIterator(prefix []byte) (Iterator, error) {
  109. if err := b.closeWG.Add(1); err != nil {
  110. return nil, err
  111. }
  112. txn := b.bdb.NewTransaction(false)
  113. it := badgerPrefixIterator(txn, prefix)
  114. it.releaseFn = func() {
  115. defer b.closeWG.Done()
  116. txn.Discard()
  117. }
  118. return it, nil
  119. }
  120. func (b *badgerBackend) NewRangeIterator(first, last []byte) (Iterator, error) {
  121. if err := b.closeWG.Add(1); err != nil {
  122. return nil, err
  123. }
  124. txn := b.bdb.NewTransaction(false)
  125. it := badgerRangeIterator(txn, first, last)
  126. it.releaseFn = func() {
  127. defer b.closeWG.Done()
  128. txn.Discard()
  129. }
  130. return it, nil
  131. }
  132. func (b *badgerBackend) Put(key, val []byte) error {
  133. if err := b.closeWG.Add(1); err != nil {
  134. return err
  135. }
  136. defer b.closeWG.Done()
  137. txn := b.bdb.NewTransaction(true)
  138. if err := txn.Set(key, val); err != nil {
  139. txn.Discard()
  140. return wrapBadgerErr(err)
  141. }
  142. return wrapBadgerErr(txn.Commit())
  143. }
  144. func (b *badgerBackend) Delete(key []byte) error {
  145. if err := b.closeWG.Add(1); err != nil {
  146. return err
  147. }
  148. defer b.closeWG.Done()
  149. txn := b.bdb.NewTransaction(true)
  150. if err := txn.Delete(key); err != nil {
  151. txn.Discard()
  152. return wrapBadgerErr(err)
  153. }
  154. return wrapBadgerErr(txn.Commit())
  155. }
  156. func (b *badgerBackend) Compact() error {
  157. if err := b.closeWG.Add(1); err != nil {
  158. return err
  159. }
  160. defer b.closeWG.Done()
  161. // This weird looking loop is as recommended in the README
  162. // (https://github.com/dgraph-io/badger/tree/v2.0.3#garbage-collection).
  163. // Basically, the RunValueLogGC will pick some promising thing to
  164. // garbage collect at random and return nil if it improved the
  165. // situation, then return ErrNoRewrite when there is nothing more to GC.
  166. // The 0.5 is the discard ratio, for which the method docs say they
  167. // "recommend setting discardRatio to 0.5, thus indicating that a file
  168. // be rewritten if half the space can be discarded".
  169. var err error
  170. t0 := time.Now()
  171. for err == nil {
  172. if time.Since(t0) > time.Hour {
  173. l.Warnln("Database compaction is taking a long time, performance may be impacted. Consider investigating and/or opening an issue if this warning repeats.")
  174. t0 = time.Now()
  175. }
  176. err = b.bdb.RunValueLogGC(0.5)
  177. }
  178. if errors.Is(err, badger.ErrNoRewrite) {
  179. // GC did nothing, because nothing needed to be done
  180. return nil
  181. }
  182. if errors.Is(err, badger.ErrRejected) {
  183. // GC was already running (could possibly happen), or the database
  184. // is closed (can't happen).
  185. return nil
  186. }
  187. if errors.Is(err, badger.ErrGCInMemoryMode) {
  188. // GC in in-memory mode, which is fine.
  189. return nil
  190. }
  191. return err
  192. }
  193. // badgerSnapshot implements backend.ReadTransaction
  194. type badgerSnapshot struct {
  195. txn *badger.Txn
  196. rel *releaser
  197. }
  198. func (l badgerSnapshot) Get(key []byte) ([]byte, error) {
  199. item, err := l.txn.Get(key)
  200. if err != nil {
  201. return nil, wrapBadgerErr(err)
  202. }
  203. val, err := item.ValueCopy(nil)
  204. if err != nil {
  205. return nil, wrapBadgerErr(err)
  206. }
  207. return val, nil
  208. }
  209. func (l badgerSnapshot) NewPrefixIterator(prefix []byte) (Iterator, error) {
  210. return badgerPrefixIterator(l.txn, prefix), nil
  211. }
  212. func (l badgerSnapshot) NewRangeIterator(first, last []byte) (Iterator, error) {
  213. return badgerRangeIterator(l.txn, first, last), nil
  214. }
  215. func (l badgerSnapshot) Release() {
  216. defer l.rel.Release()
  217. l.txn.Discard()
  218. }
  219. type badgerTransaction struct {
  220. badgerSnapshot
  221. txn *badger.Txn
  222. bdb *badger.DB
  223. rel *releaser
  224. size int
  225. }
  226. func (t *badgerTransaction) Delete(key []byte) error {
  227. t.size += len(key)
  228. kc := make([]byte, len(key))
  229. copy(kc, key)
  230. return t.transactionRetried(func(txn *badger.Txn) error {
  231. return txn.Delete(kc)
  232. })
  233. }
  234. func (t *badgerTransaction) Put(key, val []byte) error {
  235. t.size += len(key) + len(val)
  236. kc := make([]byte, len(key))
  237. copy(kc, key)
  238. vc := make([]byte, len(val))
  239. copy(vc, val)
  240. return t.transactionRetried(func(txn *badger.Txn) error {
  241. return txn.Set(kc, vc)
  242. })
  243. }
  244. // transactionRetried performs the given operation in the current
  245. // transaction, with commit and retry if Badger says the transaction has
  246. // grown too large.
  247. func (t *badgerTransaction) transactionRetried(fn func(*badger.Txn) error) error {
  248. if err := fn(t.txn); err == badger.ErrTxnTooBig {
  249. if err := t.txn.Commit(); err != nil {
  250. return wrapBadgerErr(err)
  251. }
  252. t.size = 0
  253. t.txn = t.bdb.NewTransaction(true)
  254. return wrapBadgerErr(fn(t.txn))
  255. } else if err != nil {
  256. return wrapBadgerErr(err)
  257. }
  258. return nil
  259. }
  260. func (t *badgerTransaction) Commit() error {
  261. defer t.rel.Release()
  262. defer t.badgerSnapshot.Release()
  263. return wrapBadgerErr(t.txn.Commit())
  264. }
  265. func (t *badgerTransaction) Checkpoint(preFlush ...func() error) error {
  266. if t.size < checkpointFlushMinSize {
  267. return nil
  268. }
  269. for _, hook := range preFlush {
  270. if err := hook(); err != nil {
  271. return err
  272. }
  273. }
  274. err := t.txn.Commit()
  275. if err == nil {
  276. t.size = 0
  277. t.txn = t.bdb.NewTransaction(true)
  278. }
  279. return wrapBadgerErr(err)
  280. }
  281. func (t *badgerTransaction) Release() {
  282. defer t.rel.Release()
  283. defer t.badgerSnapshot.Release()
  284. t.txn.Discard()
  285. }
  286. type badgerIterator struct {
  287. it *badger.Iterator
  288. prefix []byte
  289. first []byte
  290. last []byte
  291. releaseFn func()
  292. didSeek bool
  293. err error
  294. }
  295. func (i *badgerIterator) Next() bool {
  296. if i.err != nil {
  297. return false
  298. }
  299. for {
  300. if !i.didSeek {
  301. if i.first != nil {
  302. // Range iterator
  303. i.it.Seek(i.first)
  304. } else {
  305. // Prefix iterator
  306. i.it.Seek(i.prefix)
  307. }
  308. i.didSeek = true
  309. } else {
  310. i.it.Next()
  311. }
  312. if !i.it.ValidForPrefix(i.prefix) {
  313. // Done
  314. return false
  315. }
  316. if i.first == nil && i.last == nil {
  317. // No range checks required
  318. return true
  319. }
  320. key := i.it.Item().Key()
  321. if bytes.Compare(key, i.last) > 0 {
  322. // Key is after range last
  323. return false
  324. }
  325. return true
  326. }
  327. }
  328. func (i *badgerIterator) Key() []byte {
  329. if i.err != nil {
  330. return nil
  331. }
  332. return i.it.Item().Key()
  333. }
  334. func (i *badgerIterator) Value() []byte {
  335. if i.err != nil {
  336. return nil
  337. }
  338. val, err := i.it.Item().ValueCopy(nil)
  339. if err != nil {
  340. i.err = err
  341. }
  342. return val
  343. }
  344. func (i *badgerIterator) Error() error {
  345. return wrapBadgerErr(i.err)
  346. }
  347. func (i *badgerIterator) Release() {
  348. i.it.Close()
  349. if i.releaseFn != nil {
  350. i.releaseFn()
  351. }
  352. }
  353. // wrapBadgerErr wraps errors so that the backend package can recognize them
  354. func wrapBadgerErr(err error) error {
  355. if err == nil {
  356. return nil
  357. }
  358. if err == badger.ErrDiscardedTxn {
  359. return &errClosed{}
  360. }
  361. if err == badger.ErrKeyNotFound {
  362. return &errNotFound{}
  363. }
  364. return err
  365. }
  366. func badgerPrefixIterator(txn *badger.Txn, prefix []byte) *badgerIterator {
  367. it := iteratorForPrefix(txn, prefix)
  368. return &badgerIterator{it: it, prefix: prefix}
  369. }
  370. func badgerRangeIterator(txn *badger.Txn, first, last []byte) *badgerIterator {
  371. prefix := commonPrefix(first, last)
  372. it := iteratorForPrefix(txn, prefix)
  373. return &badgerIterator{it: it, prefix: prefix, first: first, last: last}
  374. }
  375. func iteratorForPrefix(txn *badger.Txn, prefix []byte) *badger.Iterator {
  376. opts := badger.DefaultIteratorOptions
  377. opts.Prefix = prefix
  378. return txn.NewIterator(opts)
  379. }
  380. func commonPrefix(a, b []byte) []byte {
  381. minLen := len(a)
  382. if len(b) < minLen {
  383. minLen = len(b)
  384. }
  385. prefix := make([]byte, 0, minLen)
  386. for i := 0; i < minLen; i++ {
  387. if a[i] != b[i] {
  388. break
  389. }
  390. prefix = append(prefix, a[i])
  391. }
  392. return prefix
  393. }