|
|
@@ -39,6 +39,7 @@ type Lowlevel struct {
|
|
|
deviceIdx *smallIndex
|
|
|
closed bool
|
|
|
closeMut *sync.RWMutex
|
|
|
+ iterWG sync.WaitGroup
|
|
|
}
|
|
|
|
|
|
// Open attempts to open the database at the given location, and runs
|
|
|
@@ -99,42 +100,76 @@ func (db *Lowlevel) Committed() int64 {
|
|
|
}
|
|
|
|
|
|
func (db *Lowlevel) Put(key, val []byte, wo *opt.WriteOptions) error {
|
|
|
+ db.closeMut.RLock()
|
|
|
+ defer db.closeMut.RUnlock()
|
|
|
+ if db.closed {
|
|
|
+ return leveldb.ErrClosed
|
|
|
+ }
|
|
|
atomic.AddInt64(&db.committed, 1)
|
|
|
return db.DB.Put(key, val, wo)
|
|
|
}
|
|
|
|
|
|
+func (db *Lowlevel) Write(batch *leveldb.Batch, wo *opt.WriteOptions) error {
|
|
|
+ db.closeMut.RLock()
|
|
|
+ defer db.closeMut.RUnlock()
|
|
|
+ if db.closed {
|
|
|
+ return leveldb.ErrClosed
|
|
|
+ }
|
|
|
+ return db.DB.Write(batch, wo)
|
|
|
+}
|
|
|
+
|
|
|
func (db *Lowlevel) Delete(key []byte, wo *opt.WriteOptions) error {
|
|
|
+ db.closeMut.RLock()
|
|
|
+ defer db.closeMut.RUnlock()
|
|
|
+ if db.closed {
|
|
|
+ return leveldb.ErrClosed
|
|
|
+ }
|
|
|
atomic.AddInt64(&db.committed, 1)
|
|
|
return db.DB.Delete(key, wo)
|
|
|
}
|
|
|
|
|
|
func (db *Lowlevel) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
|
|
|
+ return db.newIterator(func() iterator.Iterator { return db.DB.NewIterator(slice, ro) })
|
|
|
+}
|
|
|
+
|
|
|
+// newIterator returns an iterator created with the given constructor only if db
|
|
|
+// is not yet closed. If it is closed, a closedIter is returned instead.
|
|
|
+func (db *Lowlevel) newIterator(constr func() iterator.Iterator) iterator.Iterator {
|
|
|
db.closeMut.RLock()
|
|
|
defer db.closeMut.RUnlock()
|
|
|
if db.closed {
|
|
|
return &closedIter{}
|
|
|
}
|
|
|
- return db.DB.NewIterator(slice, ro)
|
|
|
+ db.iterWG.Add(1)
|
|
|
+ return &iter{
|
|
|
+ Iterator: constr(),
|
|
|
+ db: db,
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (db *Lowlevel) GetSnapshot() snapshot {
|
|
|
- snap, err := db.DB.GetSnapshot()
|
|
|
+ s, err := db.DB.GetSnapshot()
|
|
|
if err != nil {
|
|
|
if err == leveldb.ErrClosed {
|
|
|
return &closedSnap{}
|
|
|
}
|
|
|
panic(err)
|
|
|
}
|
|
|
- return snap
|
|
|
+ return &snap{
|
|
|
+ Snapshot: s,
|
|
|
+ db: db,
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (db *Lowlevel) Close() {
|
|
|
db.closeMut.Lock()
|
|
|
- defer db.closeMut.Unlock()
|
|
|
if db.closed {
|
|
|
+ db.closeMut.Unlock()
|
|
|
return
|
|
|
}
|
|
|
db.closed = true
|
|
|
+ db.closeMut.Unlock()
|
|
|
+ db.iterWG.Wait()
|
|
|
db.DB.Close()
|
|
|
}
|
|
|
|
|
|
@@ -146,6 +181,7 @@ func NewLowlevel(db *leveldb.DB, location string) *Lowlevel {
|
|
|
folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}),
|
|
|
deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}),
|
|
|
closeMut: &sync.RWMutex{},
|
|
|
+ iterWG: sync.WaitGroup{},
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -220,3 +256,51 @@ func (s *closedSnap) NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterato
|
|
|
return &closedIter{}
|
|
|
}
|
|
|
func (s *closedSnap) Release() {}
|
|
|
+
|
|
|
+type snap struct {
|
|
|
+ *leveldb.Snapshot
|
|
|
+ db *Lowlevel
|
|
|
+}
|
|
|
+
|
|
|
+func (s *snap) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
|
|
|
+ return s.db.newIterator(func() iterator.Iterator { return s.Snapshot.NewIterator(slice, ro) })
|
|
|
+}
|
|
|
+
|
|
|
+// iter implements iterator.Iterator which allows tracking active iterators
|
|
|
+// and aborts if the underlying database is being closed.
|
|
|
+type iter struct {
|
|
|
+ iterator.Iterator
|
|
|
+ db *Lowlevel
|
|
|
+}
|
|
|
+
|
|
|
+func (it *iter) Release() {
|
|
|
+ it.db.iterWG.Done()
|
|
|
+ it.Iterator.Release()
|
|
|
+}
|
|
|
+
|
|
|
+func (it *iter) Next() bool {
|
|
|
+ return it.execIfNotClosed(it.Iterator.Next)
|
|
|
+}
|
|
|
+func (it *iter) Prev() bool {
|
|
|
+ return it.execIfNotClosed(it.Iterator.Prev)
|
|
|
+}
|
|
|
+func (it *iter) First() bool {
|
|
|
+ return it.execIfNotClosed(it.Iterator.First)
|
|
|
+}
|
|
|
+func (it *iter) Last() bool {
|
|
|
+ return it.execIfNotClosed(it.Iterator.Last)
|
|
|
+}
|
|
|
+func (it *iter) Seek(key []byte) bool {
|
|
|
+ return it.execIfNotClosed(func() bool {
|
|
|
+ return it.Iterator.Seek(key)
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+func (it *iter) execIfNotClosed(fn func() bool) bool {
|
|
|
+ it.db.closeMut.RLock()
|
|
|
+ defer it.db.closeMut.RUnlock()
|
|
|
+ if it.db.closed {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return fn()
|
|
|
+}
|