123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- // Copyright (C) 2018 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- package backend
- import (
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/iterator"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- const (
- // Never flush transactions smaller than this, even on Checkpoint().
- // This just needs to be just large enough to avoid flushing
- // transactions when they are super tiny, thus creating millions of tiny
- // transactions unnecessarily.
- dbFlushBatchMin = 64 << KiB
- // Once a transaction reaches this size, flush it unconditionally. This
- // should be large enough to avoid forcing a flush between Checkpoint()
- // calls in loops where we do those, so in principle just large enough
- // to hold a FileInfo plus corresponding version list and metadata
- // updates or two.
- dbFlushBatchMax = 1 << MiB
- )
- // leveldbBackend implements Backend on top of a leveldb
- type leveldbBackend struct {
- ldb *leveldb.DB
- closeWG *closeWaitGroup
- location string
- }
- func newLeveldbBackend(ldb *leveldb.DB, location string) *leveldbBackend {
- return &leveldbBackend{
- ldb: ldb,
- closeWG: &closeWaitGroup{},
- location: location,
- }
- }
- func (b *leveldbBackend) NewReadTransaction() (ReadTransaction, error) {
- return b.newSnapshot()
- }
- func (b *leveldbBackend) newSnapshot() (leveldbSnapshot, error) {
- rel, err := newReleaser(b.closeWG)
- if err != nil {
- return leveldbSnapshot{}, err
- }
- snap, err := b.ldb.GetSnapshot()
- if err != nil {
- rel.Release()
- return leveldbSnapshot{}, wrapLeveldbErr(err)
- }
- return leveldbSnapshot{
- snap: snap,
- rel: rel,
- }, nil
- }
- func (b *leveldbBackend) NewWriteTransaction(hooks ...CommitHook) (WriteTransaction, error) {
- rel, err := newReleaser(b.closeWG)
- if err != nil {
- return nil, err
- }
- snap, err := b.newSnapshot()
- if err != nil {
- rel.Release()
- return nil, err // already wrapped
- }
- return &leveldbTransaction{
- leveldbSnapshot: snap,
- ldb: b.ldb,
- batch: new(leveldb.Batch),
- rel: rel,
- commitHooks: hooks,
- inFlush: false,
- }, nil
- }
- func (b *leveldbBackend) Close() error {
- b.closeWG.CloseWait()
- return wrapLeveldbErr(b.ldb.Close())
- }
- func (b *leveldbBackend) Get(key []byte) ([]byte, error) {
- val, err := b.ldb.Get(key, nil)
- return val, wrapLeveldbErr(err)
- }
- func (b *leveldbBackend) NewPrefixIterator(prefix []byte) (Iterator, error) {
- return &leveldbIterator{b.ldb.NewIterator(util.BytesPrefix(prefix), nil)}, nil
- }
- func (b *leveldbBackend) NewRangeIterator(first, last []byte) (Iterator, error) {
- return &leveldbIterator{b.ldb.NewIterator(&util.Range{Start: first, Limit: last}, nil)}, nil
- }
- func (b *leveldbBackend) Put(key, val []byte) error {
- return wrapLeveldbErr(b.ldb.Put(key, val, nil))
- }
- func (b *leveldbBackend) Delete(key []byte) error {
- return wrapLeveldbErr(b.ldb.Delete(key, nil))
- }
- func (b *leveldbBackend) Compact() error {
- // Race is detected during testing when db is closed while compaction
- // is ongoing.
- err := b.closeWG.Add(1)
- if err != nil {
- return err
- }
- defer b.closeWG.Done()
- return wrapLeveldbErr(b.ldb.CompactRange(util.Range{}))
- }
- func (b *leveldbBackend) Location() string {
- return b.location
- }
- // leveldbSnapshot implements backend.ReadTransaction
- type leveldbSnapshot struct {
- snap *leveldb.Snapshot
- rel *releaser
- }
- func (l leveldbSnapshot) Get(key []byte) ([]byte, error) {
- val, err := l.snap.Get(key, nil)
- return val, wrapLeveldbErr(err)
- }
- func (l leveldbSnapshot) NewPrefixIterator(prefix []byte) (Iterator, error) {
- return l.snap.NewIterator(util.BytesPrefix(prefix), nil), nil
- }
- func (l leveldbSnapshot) NewRangeIterator(first, last []byte) (Iterator, error) {
- return l.snap.NewIterator(&util.Range{Start: first, Limit: last}, nil), nil
- }
- func (l leveldbSnapshot) Release() {
- l.snap.Release()
- l.rel.Release()
- }
- // leveldbTransaction implements backend.WriteTransaction using a batch (not
- // an actual leveldb transaction)
- type leveldbTransaction struct {
- leveldbSnapshot
- ldb *leveldb.DB
- batch *leveldb.Batch
- rel *releaser
- commitHooks []CommitHook
- inFlush bool
- }
- func (t *leveldbTransaction) Delete(key []byte) error {
- t.batch.Delete(key)
- return t.checkFlush(dbFlushBatchMax)
- }
- func (t *leveldbTransaction) Put(key, val []byte) error {
- t.batch.Put(key, val)
- return t.checkFlush(dbFlushBatchMax)
- }
- func (t *leveldbTransaction) Checkpoint() error {
- return t.checkFlush(dbFlushBatchMin)
- }
- func (t *leveldbTransaction) Commit() error {
- err := wrapLeveldbErr(t.flush())
- t.leveldbSnapshot.Release()
- t.rel.Release()
- return err
- }
- func (t *leveldbTransaction) Release() {
- t.leveldbSnapshot.Release()
- t.rel.Release()
- }
- // checkFlush flushes and resets the batch if its size exceeds the given size.
- func (t *leveldbTransaction) checkFlush(size int) error {
- // Hooks might put values in the database, which triggers a checkFlush which might trigger a flush,
- // which might trigger the hooks.
- // Don't recurse...
- if t.inFlush || len(t.batch.Dump()) < size {
- return nil
- }
- return t.flush()
- }
- func (t *leveldbTransaction) flush() error {
- t.inFlush = true
- defer func() { t.inFlush = false }()
- for _, hook := range t.commitHooks {
- if err := hook(t); err != nil {
- return err
- }
- }
- if t.batch.Len() == 0 {
- return nil
- }
- if err := t.ldb.Write(t.batch, nil); err != nil {
- return wrapLeveldbErr(err)
- }
- t.batch.Reset()
- return nil
- }
- type leveldbIterator struct {
- iterator.Iterator
- }
- func (it *leveldbIterator) Error() error {
- return wrapLeveldbErr(it.Iterator.Error())
- }
- // wrapLeveldbErr wraps errors so that the backend package can recognize them
- func wrapLeveldbErr(err error) error {
- if err == leveldb.ErrClosed {
- return &errClosed{}
- }
- if err == leveldb.ErrNotFound {
- return &errNotFound{}
- }
- return err
- }
|