|
|
@@ -7,6 +7,7 @@
|
|
|
package leveldb
|
|
|
|
|
|
import (
|
|
|
+ "bytes"
|
|
|
"container/list"
|
|
|
crand "crypto/rand"
|
|
|
"encoding/binary"
|
|
|
@@ -23,6 +24,7 @@ import (
|
|
|
"unsafe"
|
|
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/comparer"
|
|
|
+ "github.com/syndtr/goleveldb/leveldb/errors"
|
|
|
"github.com/syndtr/goleveldb/leveldb/filter"
|
|
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
|
@@ -151,7 +153,10 @@ func (h *dbHarness) maxNextLevelOverlappingBytes(want uint64) {
|
|
|
t := h.t
|
|
|
db := h.db
|
|
|
|
|
|
- var res uint64
|
|
|
+ var (
|
|
|
+ maxOverlaps uint64
|
|
|
+ maxLevel int
|
|
|
+ )
|
|
|
v := db.s.version()
|
|
|
for i, tt := range v.tables[1 : len(v.tables)-1] {
|
|
|
level := i + 1
|
|
|
@@ -159,15 +164,18 @@ func (h *dbHarness) maxNextLevelOverlappingBytes(want uint64) {
|
|
|
for _, t := range tt {
|
|
|
r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false)
|
|
|
sum := r.size()
|
|
|
- if sum > res {
|
|
|
- res = sum
|
|
|
+ if sum > maxOverlaps {
|
|
|
+ maxOverlaps = sum
|
|
|
+ maxLevel = level
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
v.release()
|
|
|
|
|
|
- if res > want {
|
|
|
- t.Errorf("next level overlapping bytes is more than %d, got=%d", want, res)
|
|
|
+ if maxOverlaps > want {
|
|
|
+ t.Errorf("next level most overlapping bytes is more than %d, got=%d level=%d", want, maxOverlaps, maxLevel)
|
|
|
+ } else {
|
|
|
+ t.Logf("next level most overlapping bytes is %d, level=%d want=%d", maxOverlaps, maxLevel, want)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -240,7 +248,7 @@ func (h *dbHarness) allEntriesFor(key, want string) {
|
|
|
db := h.db
|
|
|
s := db.s
|
|
|
|
|
|
- ikey := newIKey([]byte(key), kMaxSeq, tVal)
|
|
|
+ ikey := newIkey([]byte(key), kMaxSeq, ktVal)
|
|
|
iter := db.newRawIterator(nil, nil)
|
|
|
if !iter.Seek(ikey) && iter.Error() != nil {
|
|
|
t.Error("AllEntries: error during seek, err: ", iter.Error())
|
|
|
@@ -249,19 +257,18 @@ func (h *dbHarness) allEntriesFor(key, want string) {
|
|
|
res := "[ "
|
|
|
first := true
|
|
|
for iter.Valid() {
|
|
|
- rkey := iKey(iter.Key())
|
|
|
- if _, t, ok := rkey.parseNum(); ok {
|
|
|
- if s.icmp.uCompare(ikey.ukey(), rkey.ukey()) != 0 {
|
|
|
+ if ukey, _, kt, kerr := parseIkey(iter.Key()); kerr == nil {
|
|
|
+ if s.icmp.uCompare(ikey.ukey(), ukey) != 0 {
|
|
|
break
|
|
|
}
|
|
|
if !first {
|
|
|
res += ", "
|
|
|
}
|
|
|
first = false
|
|
|
- switch t {
|
|
|
- case tVal:
|
|
|
+ switch kt {
|
|
|
+ case ktVal:
|
|
|
res += string(iter.Value())
|
|
|
- case tDel:
|
|
|
+ case ktDel:
|
|
|
res += "DEL"
|
|
|
}
|
|
|
} else {
|
|
|
@@ -326,6 +333,8 @@ func (h *dbHarness) compactMem() {
|
|
|
t := h.t
|
|
|
db := h.db
|
|
|
|
|
|
+ t.Log("starting memdb compaction")
|
|
|
+
|
|
|
db.writeLockC <- struct{}{}
|
|
|
defer func() {
|
|
|
<-db.writeLockC
|
|
|
@@ -341,6 +350,8 @@ func (h *dbHarness) compactMem() {
|
|
|
if h.totalTables() == 0 {
|
|
|
t.Error("zero tables after mem compaction")
|
|
|
}
|
|
|
+
|
|
|
+ t.Log("memdb compaction done")
|
|
|
}
|
|
|
|
|
|
func (h *dbHarness) compactRangeAtErr(level int, min, max string, wanterr bool) {
|
|
|
@@ -355,6 +366,8 @@ func (h *dbHarness) compactRangeAtErr(level int, min, max string, wanterr bool)
|
|
|
_max = []byte(max)
|
|
|
}
|
|
|
|
|
|
+ t.Logf("starting table range compaction: level=%d, min=%q, max=%q", level, min, max)
|
|
|
+
|
|
|
if err := db.compSendRange(db.tcompCmdC, level, _min, _max); err != nil {
|
|
|
if wanterr {
|
|
|
t.Log("CompactRangeAt: got error (expected): ", err)
|
|
|
@@ -364,6 +377,8 @@ func (h *dbHarness) compactRangeAtErr(level int, min, max string, wanterr bool)
|
|
|
} else if wanterr {
|
|
|
t.Error("CompactRangeAt: expect error")
|
|
|
}
|
|
|
+
|
|
|
+ t.Log("table range compaction done")
|
|
|
}
|
|
|
|
|
|
func (h *dbHarness) compactRangeAt(level int, min, max string) {
|
|
|
@@ -374,6 +389,8 @@ func (h *dbHarness) compactRange(min, max string) {
|
|
|
t := h.t
|
|
|
db := h.db
|
|
|
|
|
|
+ t.Logf("starting DB range compaction: min=%q, max=%q", min, max)
|
|
|
+
|
|
|
var r util.Range
|
|
|
if min != "" {
|
|
|
r.Start = []byte(min)
|
|
|
@@ -384,6 +401,8 @@ func (h *dbHarness) compactRange(min, max string) {
|
|
|
if err := db.CompactRange(r); err != nil {
|
|
|
t.Error("CompactRange: got error: ", err)
|
|
|
}
|
|
|
+
|
|
|
+ t.Log("DB range compaction done")
|
|
|
}
|
|
|
|
|
|
func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) {
|
|
|
@@ -505,10 +524,10 @@ func Test_FieldsAligned(t *testing.T) {
|
|
|
p1 := new(DB)
|
|
|
testAligned(t, "DB.seq", unsafe.Offsetof(p1.seq))
|
|
|
p2 := new(session)
|
|
|
- testAligned(t, "session.stFileNum", unsafe.Offsetof(p2.stFileNum))
|
|
|
+ testAligned(t, "session.stNextFileNum", unsafe.Offsetof(p2.stNextFileNum))
|
|
|
testAligned(t, "session.stJournalNum", unsafe.Offsetof(p2.stJournalNum))
|
|
|
testAligned(t, "session.stPrevJournalNum", unsafe.Offsetof(p2.stPrevJournalNum))
|
|
|
- testAligned(t, "session.stSeq", unsafe.Offsetof(p2.stSeq))
|
|
|
+ testAligned(t, "session.stSeqNum", unsafe.Offsetof(p2.stSeqNum))
|
|
|
}
|
|
|
|
|
|
func TestDb_Locking(t *testing.T) {
|
|
|
@@ -944,7 +963,7 @@ func TestDb_RepeatedWritesToSameKey(t *testing.T) {
|
|
|
h := newDbHarnessWopt(t, &opt.Options{WriteBuffer: 100000})
|
|
|
defer h.close()
|
|
|
|
|
|
- maxTables := kNumLevels + kL0_StopWritesTrigger
|
|
|
+ maxTables := h.o.GetNumLevel() + h.o.GetWriteL0PauseTrigger()
|
|
|
|
|
|
value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
|
|
|
for i := 0; i < 5*maxTables; i++ {
|
|
|
@@ -962,7 +981,7 @@ func TestDb_RepeatedWritesToSameKeyAfterReopen(t *testing.T) {
|
|
|
|
|
|
h.reopenDB()
|
|
|
|
|
|
- maxTables := kNumLevels + kL0_StopWritesTrigger
|
|
|
+ maxTables := h.o.GetNumLevel() + h.o.GetWriteL0PauseTrigger()
|
|
|
|
|
|
value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
|
|
|
for i := 0; i < 5*maxTables; i++ {
|
|
|
@@ -978,7 +997,7 @@ func TestDb_SparseMerge(t *testing.T) {
|
|
|
h := newDbHarnessWopt(t, &opt.Options{Compression: opt.NoCompression})
|
|
|
defer h.close()
|
|
|
|
|
|
- h.putMulti(kNumLevels, "A", "Z")
|
|
|
+ h.putMulti(h.o.GetNumLevel(), "A", "Z")
|
|
|
|
|
|
// Suppose there is:
|
|
|
// small amount of data with prefix A
|
|
|
@@ -1002,6 +1021,7 @@ func TestDb_SparseMerge(t *testing.T) {
|
|
|
h.put("C", "vc2")
|
|
|
h.compactMem()
|
|
|
|
|
|
+ h.waitCompaction()
|
|
|
h.maxNextLevelOverlappingBytes(20 * 1048576)
|
|
|
h.compactRangeAt(0, "", "")
|
|
|
h.waitCompaction()
|
|
|
@@ -1172,7 +1192,7 @@ func TestDb_HiddenValuesAreRemoved(t *testing.T) {
|
|
|
|
|
|
h.put("foo", "v1")
|
|
|
h.compactMem()
|
|
|
- m := kMaxMemCompactLevel
|
|
|
+ m := h.o.GetMaxMemCompationLevel()
|
|
|
v := s.version()
|
|
|
num := v.tLen(m)
|
|
|
v.release()
|
|
|
@@ -1216,7 +1236,7 @@ func TestDb_DeletionMarkers2(t *testing.T) {
|
|
|
|
|
|
h.put("foo", "v1")
|
|
|
h.compactMem()
|
|
|
- m := kMaxMemCompactLevel
|
|
|
+ m := h.o.GetMaxMemCompationLevel()
|
|
|
v := s.version()
|
|
|
num := v.tLen(m)
|
|
|
v.release()
|
|
|
@@ -1269,14 +1289,14 @@ func TestDb_CompactionTableOpenError(t *testing.T) {
|
|
|
t.Errorf("total tables is %d, want %d", n, im)
|
|
|
}
|
|
|
|
|
|
- h.stor.SetOpenErr(storage.TypeTable)
|
|
|
+ h.stor.SetEmuErr(storage.TypeTable, tsOpOpen)
|
|
|
go h.db.CompactRange(util.Range{})
|
|
|
if err := h.db.compSendIdle(h.db.tcompCmdC); err != nil {
|
|
|
t.Log("compaction error: ", err)
|
|
|
}
|
|
|
h.closeDB0()
|
|
|
h.openDB()
|
|
|
- h.stor.SetOpenErr(0)
|
|
|
+ h.stor.SetEmuErr(0, tsOpOpen)
|
|
|
|
|
|
for i := 0; i < im; i++ {
|
|
|
for j := 0; j < jm; j++ {
|
|
|
@@ -1287,7 +1307,7 @@ func TestDb_CompactionTableOpenError(t *testing.T) {
|
|
|
|
|
|
func TestDb_OverlapInLevel0(t *testing.T) {
|
|
|
trun(t, func(h *dbHarness) {
|
|
|
- if kMaxMemCompactLevel != 2 {
|
|
|
+ if h.o.GetMaxMemCompationLevel() != 2 {
|
|
|
t.Fatal("fix test to reflect the config")
|
|
|
}
|
|
|
|
|
|
@@ -1407,23 +1427,23 @@ func TestDb_ManifestWriteError(t *testing.T) {
|
|
|
h.compactMem()
|
|
|
h.getVal("foo", "bar")
|
|
|
v := h.db.s.version()
|
|
|
- if n := v.tLen(kMaxMemCompactLevel); n != 1 {
|
|
|
+ if n := v.tLen(h.o.GetMaxMemCompationLevel()); n != 1 {
|
|
|
t.Errorf("invalid total tables, want=1 got=%d", n)
|
|
|
}
|
|
|
v.release()
|
|
|
|
|
|
if i == 0 {
|
|
|
- h.stor.SetWriteErr(storage.TypeManifest)
|
|
|
+ h.stor.SetEmuErr(storage.TypeManifest, tsOpWrite)
|
|
|
} else {
|
|
|
- h.stor.SetSyncErr(storage.TypeManifest)
|
|
|
+ h.stor.SetEmuErr(storage.TypeManifest, tsOpSync)
|
|
|
}
|
|
|
|
|
|
// Merging compaction (will fail)
|
|
|
- h.compactRangeAtErr(kMaxMemCompactLevel, "", "", true)
|
|
|
+ h.compactRangeAtErr(h.o.GetMaxMemCompationLevel(), "", "", true)
|
|
|
|
|
|
h.db.Close()
|
|
|
- h.stor.SetWriteErr(0)
|
|
|
- h.stor.SetSyncErr(0)
|
|
|
+ h.stor.SetEmuErr(0, tsOpWrite)
|
|
|
+ h.stor.SetEmuErr(0, tsOpSync)
|
|
|
|
|
|
// Should not lose data
|
|
|
h.openDB()
|
|
|
@@ -1573,7 +1593,7 @@ func TestDb_ManualCompaction(t *testing.T) {
|
|
|
h := newDbHarness(t)
|
|
|
defer h.close()
|
|
|
|
|
|
- if kMaxMemCompactLevel != 2 {
|
|
|
+ if h.o.GetMaxMemCompationLevel() != 2 {
|
|
|
t.Fatal("fix test to reflect the config")
|
|
|
}
|
|
|
|
|
|
@@ -1857,7 +1877,7 @@ func TestDb_DeletionMarkersOnMemdb(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestDb_LeveldbIssue178(t *testing.T) {
|
|
|
- nKeys := (kMaxTableSize / 30) * 5
|
|
|
+ nKeys := (opt.DefaultCompactionTableSize / 30) * 5
|
|
|
key1 := func(i int) string {
|
|
|
return fmt.Sprintf("my_key_%d", i)
|
|
|
}
|
|
|
@@ -2125,7 +2145,7 @@ func TestDb_GoleveldbIssue72and83(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
if err := iter.Error(); err != nil {
|
|
|
- t.Fatalf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, err)
|
|
|
+ t.Fatalf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, writei, err)
|
|
|
}
|
|
|
iter.Release()
|
|
|
snap.Release()
|
|
|
@@ -2164,5 +2184,385 @@ func TestDb_GoleveldbIssue72and83(t *testing.T) {
|
|
|
}()
|
|
|
|
|
|
wg.Wait()
|
|
|
+}
|
|
|
+
|
|
|
+func TestDb_TransientError(t *testing.T) {
|
|
|
+ h := newDbHarnessWopt(t, &opt.Options{
|
|
|
+ WriteBuffer: 128 * opt.KiB,
|
|
|
+ CachedOpenFiles: 3,
|
|
|
+ DisableCompactionBackoff: true,
|
|
|
+ })
|
|
|
+ defer h.close()
|
|
|
+
|
|
|
+ const (
|
|
|
+ nSnap = 20
|
|
|
+ nKey = 10000
|
|
|
+ )
|
|
|
+
|
|
|
+ var (
|
|
|
+ snaps [nSnap]*Snapshot
|
|
|
+ b = &Batch{}
|
|
|
+ )
|
|
|
+ for i := range snaps {
|
|
|
+ vtail := fmt.Sprintf("VAL%030d", i)
|
|
|
+ b.Reset()
|
|
|
+ for k := 0; k < nKey; k++ {
|
|
|
+ key := fmt.Sprintf("KEY%8d", k)
|
|
|
+ b.Put([]byte(key), []byte(key+vtail))
|
|
|
+ }
|
|
|
+ h.stor.SetEmuRandErr(storage.TypeTable, tsOpOpen, tsOpRead, tsOpReadAt)
|
|
|
+ if err := h.db.Write(b, nil); err != nil {
|
|
|
+ t.Logf("WRITE #%d error: %v", i, err)
|
|
|
+ h.stor.SetEmuRandErr(0, tsOpOpen, tsOpRead, tsOpReadAt, tsOpWrite)
|
|
|
+ for {
|
|
|
+ if err := h.db.Write(b, nil); err == nil {
|
|
|
+ break
|
|
|
+ } else if errors.IsCorrupted(err) {
|
|
|
+ t.Fatalf("WRITE #%d corrupted: %v", i, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ snaps[i] = h.db.newSnapshot()
|
|
|
+ b.Reset()
|
|
|
+ for k := 0; k < nKey; k++ {
|
|
|
+ key := fmt.Sprintf("KEY%8d", k)
|
|
|
+ b.Delete([]byte(key))
|
|
|
+ }
|
|
|
+ h.stor.SetEmuRandErr(storage.TypeTable, tsOpOpen, tsOpRead, tsOpReadAt)
|
|
|
+ if err := h.db.Write(b, nil); err != nil {
|
|
|
+ t.Logf("WRITE #%d error: %v", i, err)
|
|
|
+ h.stor.SetEmuRandErr(0, tsOpOpen, tsOpRead, tsOpReadAt)
|
|
|
+ for {
|
|
|
+ if err := h.db.Write(b, nil); err == nil {
|
|
|
+ break
|
|
|
+ } else if errors.IsCorrupted(err) {
|
|
|
+ t.Fatalf("WRITE #%d corrupted: %v", i, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ h.stor.SetEmuRandErr(0, tsOpOpen, tsOpRead, tsOpReadAt)
|
|
|
+
|
|
|
+ runtime.GOMAXPROCS(runtime.NumCPU())
|
|
|
+
|
|
|
+ rnd := rand.New(rand.NewSource(0xecafdaed))
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ for i, snap := range snaps {
|
|
|
+ wg.Add(2)
|
|
|
+
|
|
|
+ go func(i int, snap *Snapshot, sk []int) {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ vtail := fmt.Sprintf("VAL%030d", i)
|
|
|
+ for _, k := range sk {
|
|
|
+ key := fmt.Sprintf("KEY%8d", k)
|
|
|
+ xvalue, err := snap.Get([]byte(key), nil)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
|
|
|
+ }
|
|
|
+ value := key + vtail
|
|
|
+ if !bytes.Equal([]byte(value), xvalue) {
|
|
|
+ t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }(i, snap, rnd.Perm(nKey))
|
|
|
+
|
|
|
+ go func(i int, snap *Snapshot) {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ vtail := fmt.Sprintf("VAL%030d", i)
|
|
|
+ iter := snap.NewIterator(nil, nil)
|
|
|
+ defer iter.Release()
|
|
|
+ for k := 0; k < nKey; k++ {
|
|
|
+ if !iter.Next() {
|
|
|
+ if err := iter.Error(); err != nil {
|
|
|
+ t.Fatalf("READER_ITER #%d K%d error: %v", i, k, err)
|
|
|
+ } else {
|
|
|
+ t.Fatalf("READER_ITER #%d K%d eoi", i, k)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ key := fmt.Sprintf("KEY%8d", k)
|
|
|
+ xkey := iter.Key()
|
|
|
+ if !bytes.Equal([]byte(key), xkey) {
|
|
|
+ t.Fatalf("READER_ITER #%d K%d invalid key: want %q, got %q", i, k, key, xkey)
|
|
|
+ }
|
|
|
+ value := key + vtail
|
|
|
+ xvalue := iter.Value()
|
|
|
+ if !bytes.Equal([]byte(value), xvalue) {
|
|
|
+ t.Fatalf("READER_ITER #%d K%d invalid value: want %q, got %q", i, k, value, xvalue)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }(i, snap)
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+}
|
|
|
|
|
|
+func TestDb_UkeyShouldntHopAcrossTable(t *testing.T) {
|
|
|
+ h := newDbHarnessWopt(t, &opt.Options{
|
|
|
+ WriteBuffer: 112 * opt.KiB,
|
|
|
+ CompactionTableSize: 90 * opt.KiB,
|
|
|
+ CompactionExpandLimitFactor: 1,
|
|
|
+ })
|
|
|
+ defer h.close()
|
|
|
+
|
|
|
+ const (
|
|
|
+ nSnap = 190
|
|
|
+ nKey = 140
|
|
|
+ )
|
|
|
+
|
|
|
+ var (
|
|
|
+ snaps [nSnap]*Snapshot
|
|
|
+ b = &Batch{}
|
|
|
+ )
|
|
|
+ for i := range snaps {
|
|
|
+ vtail := fmt.Sprintf("VAL%030d", i)
|
|
|
+ b.Reset()
|
|
|
+ for k := 0; k < nKey; k++ {
|
|
|
+ key := fmt.Sprintf("KEY%08d", k)
|
|
|
+ b.Put([]byte(key), []byte(key+vtail))
|
|
|
+ }
|
|
|
+ if err := h.db.Write(b, nil); err != nil {
|
|
|
+ t.Fatalf("WRITE #%d error: %v", i, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ snaps[i] = h.db.newSnapshot()
|
|
|
+ b.Reset()
|
|
|
+ for k := 0; k < nKey; k++ {
|
|
|
+ key := fmt.Sprintf("KEY%08d", k)
|
|
|
+ b.Delete([]byte(key))
|
|
|
+ }
|
|
|
+ if err := h.db.Write(b, nil); err != nil {
|
|
|
+ t.Fatalf("WRITE #%d error: %v", i, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ h.compactMem()
|
|
|
+
|
|
|
+ h.waitCompaction()
|
|
|
+ for level, tables := range h.db.s.stVersion.tables {
|
|
|
+ for _, table := range tables {
|
|
|
+ t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ h.compactRangeAt(0, "", "")
|
|
|
+ h.waitCompaction()
|
|
|
+ for level, tables := range h.db.s.stVersion.tables {
|
|
|
+ for _, table := range tables {
|
|
|
+ t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ h.compactRangeAt(1, "", "")
|
|
|
+ h.waitCompaction()
|
|
|
+ for level, tables := range h.db.s.stVersion.tables {
|
|
|
+ for _, table := range tables {
|
|
|
+ t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ runtime.GOMAXPROCS(runtime.NumCPU())
|
|
|
+
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ for i, snap := range snaps {
|
|
|
+ wg.Add(1)
|
|
|
+
|
|
|
+ go func(i int, snap *Snapshot) {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ vtail := fmt.Sprintf("VAL%030d", i)
|
|
|
+ for k := 0; k < nKey; k++ {
|
|
|
+ key := fmt.Sprintf("KEY%08d", k)
|
|
|
+ xvalue, err := snap.Get([]byte(key), nil)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
|
|
|
+ }
|
|
|
+ value := key + vtail
|
|
|
+ if !bytes.Equal([]byte(value), xvalue) {
|
|
|
+ t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }(i, snap)
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+}
|
|
|
+
|
|
|
+func TestDb_TableCompactionBuilder(t *testing.T) {
|
|
|
+ stor := newTestStorage(t)
|
|
|
+ defer stor.Close()
|
|
|
+
|
|
|
+ const nSeq = 99
|
|
|
+
|
|
|
+ o := &opt.Options{
|
|
|
+ WriteBuffer: 112 * opt.KiB,
|
|
|
+ CompactionTableSize: 43 * opt.KiB,
|
|
|
+ CompactionExpandLimitFactor: 1,
|
|
|
+ CompactionGPOverlapsFactor: 1,
|
|
|
+ BlockCache: opt.NoCache,
|
|
|
+ }
|
|
|
+ s, err := newSession(stor, o)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ if err := s.create(); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ defer s.close()
|
|
|
+ var (
|
|
|
+ seq uint64
|
|
|
+ targetSize = 5 * o.CompactionTableSize
|
|
|
+ value = bytes.Repeat([]byte{'0'}, 100)
|
|
|
+ )
|
|
|
+ for i := 0; i < 2; i++ {
|
|
|
+ tw, err := s.tops.create()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ for k := 0; tw.tw.BytesLen() < targetSize; k++ {
|
|
|
+ key := []byte(fmt.Sprintf("%09d", k))
|
|
|
+ seq += nSeq - 1
|
|
|
+ for x := uint64(0); x < nSeq; x++ {
|
|
|
+ if err := tw.append(newIkey(key, seq-x, ktVal), value); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tf, err := tw.finish()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
|
|
|
+ rec.addTableFile(i, tf)
|
|
|
+ if err := s.commit(rec); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Build grandparent.
|
|
|
+ v := s.version()
|
|
|
+ c := newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
|
|
|
+ rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
|
|
|
+ b := &tableCompactionBuilder{
|
|
|
+ s: s,
|
|
|
+ c: c,
|
|
|
+ rec: rec,
|
|
|
+ stat1: new(cStatsStaging),
|
|
|
+ minSeq: 0,
|
|
|
+ strict: true,
|
|
|
+ tableSize: o.CompactionTableSize/3 + 961,
|
|
|
+ }
|
|
|
+ if err := b.run(new(compactionTransactCounter)); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ for _, t := range c.tables[0] {
|
|
|
+ rec.delTable(c.level, t.file.Num())
|
|
|
+ }
|
|
|
+ if err := s.commit(rec); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ c.release()
|
|
|
+
|
|
|
+ // Build level-1.
|
|
|
+ v = s.version()
|
|
|
+ c = newCompaction(s, v, 0, append(tFiles{}, v.tables[0]...))
|
|
|
+ rec = &sessionRecord{numLevel: s.o.GetNumLevel()}
|
|
|
+ b = &tableCompactionBuilder{
|
|
|
+ s: s,
|
|
|
+ c: c,
|
|
|
+ rec: rec,
|
|
|
+ stat1: new(cStatsStaging),
|
|
|
+ minSeq: 0,
|
|
|
+ strict: true,
|
|
|
+ tableSize: o.CompactionTableSize,
|
|
|
+ }
|
|
|
+ if err := b.run(new(compactionTransactCounter)); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ for _, t := range c.tables[0] {
|
|
|
+ rec.delTable(c.level, t.file.Num())
|
|
|
+ }
|
|
|
+ // Move grandparent to level-3
|
|
|
+ for _, t := range v.tables[2] {
|
|
|
+ rec.delTable(2, t.file.Num())
|
|
|
+ rec.addTableFile(3, t)
|
|
|
+ }
|
|
|
+ if err := s.commit(rec); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ c.release()
|
|
|
+
|
|
|
+ v = s.version()
|
|
|
+ for level, want := range []bool{false, true, false, true, false} {
|
|
|
+ got := len(v.tables[level]) > 0
|
|
|
+ if want != got {
|
|
|
+ t.Fatalf("invalid level-%d tables len: want %v, got %v", level, want, got)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for i, f := range v.tables[1][:len(v.tables[1])-1] {
|
|
|
+ nf := v.tables[1][i+1]
|
|
|
+ if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) {
|
|
|
+ t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.file.Num(), nf.file.Num())
|
|
|
+ }
|
|
|
+ }
|
|
|
+ v.release()
|
|
|
+
|
|
|
+ // Compaction with transient error.
|
|
|
+ v = s.version()
|
|
|
+ c = newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
|
|
|
+ rec = &sessionRecord{numLevel: s.o.GetNumLevel()}
|
|
|
+ b = &tableCompactionBuilder{
|
|
|
+ s: s,
|
|
|
+ c: c,
|
|
|
+ rec: rec,
|
|
|
+ stat1: new(cStatsStaging),
|
|
|
+ minSeq: 0,
|
|
|
+ strict: true,
|
|
|
+ tableSize: o.CompactionTableSize,
|
|
|
+ }
|
|
|
+ stor.SetEmuErrOnce(storage.TypeTable, tsOpSync)
|
|
|
+ stor.SetEmuRandErr(storage.TypeTable, tsOpRead, tsOpReadAt, tsOpWrite)
|
|
|
+ stor.SetEmuRandErrProb(0xf0)
|
|
|
+ for {
|
|
|
+ if err := b.run(new(compactionTransactCounter)); err != nil {
|
|
|
+ t.Logf("(expected) b.run: %v", err)
|
|
|
+ } else {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if err := s.commit(rec); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ c.release()
|
|
|
+
|
|
|
+ stor.SetEmuErrOnce(0, tsOpSync)
|
|
|
+ stor.SetEmuRandErr(0, tsOpRead, tsOpReadAt, tsOpWrite)
|
|
|
+
|
|
|
+ v = s.version()
|
|
|
+ if len(v.tables[1]) != len(v.tables[2]) {
|
|
|
+ t.Fatalf("invalid tables length, want %d, got %d", len(v.tables[1]), len(v.tables[2]))
|
|
|
+ }
|
|
|
+ for i, f0 := range v.tables[1] {
|
|
|
+ f1 := v.tables[2][i]
|
|
|
+ iter0 := s.tops.newIterator(f0, nil, nil)
|
|
|
+ iter1 := s.tops.newIterator(f1, nil, nil)
|
|
|
+ for j := 0; true; j++ {
|
|
|
+ next0 := iter0.Next()
|
|
|
+ next1 := iter1.Next()
|
|
|
+ if next0 != next1 {
|
|
|
+ t.Fatalf("#%d.%d invalid eoi: want %v, got %v", i, j, next0, next1)
|
|
|
+ }
|
|
|
+ key0 := iter0.Key()
|
|
|
+ key1 := iter1.Key()
|
|
|
+ if !bytes.Equal(key0, key1) {
|
|
|
+ t.Fatalf("#%d.%d invalid key: want %q, got %q", i, j, key0, key1)
|
|
|
+ }
|
|
|
+ if next0 == false {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ iter0.Release()
|
|
|
+ iter1.Release()
|
|
|
+ }
|
|
|
+ v.release()
|
|
|
}
|