| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628 |
- package main
- import (
- "crypto/rand"
- "encoding/binary"
- "flag"
- "fmt"
- "log"
- mrand "math/rand"
- "net/http"
- _ "net/http/pprof"
- "os"
- "os/signal"
- "path"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/storage"
- "github.com/syndtr/goleveldb/leveldb/table"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- var (
- dbPath = path.Join(os.TempDir(), "goleveldb-testdb")
- openFilesCacheCapacity = 500
- keyLen = 63
- valueLen = 256
- numKeys = arrayInt{100000, 1332, 531, 1234, 9553, 1024, 35743}
- httpProf = "127.0.0.1:5454"
- transactionProb = 0.5
- enableBlockCache = false
- enableCompression = false
- enableBufferPool = false
- wg = new(sync.WaitGroup)
- done, fail uint32
- bpool *util.BufferPool
- )
- type arrayInt []int
- func (a arrayInt) String() string {
- var str string
- for i, n := range a {
- if i > 0 {
- str += ","
- }
- str += strconv.Itoa(n)
- }
- return str
- }
- func (a *arrayInt) Set(str string) error {
- var na arrayInt
- for _, s := range strings.Split(str, ",") {
- s = strings.TrimSpace(s)
- if s != "" {
- n, err := strconv.Atoi(s)
- if err != nil {
- return err
- }
- na = append(na, n)
- }
- }
- *a = na
- return nil
- }
- func init() {
- flag.StringVar(&dbPath, "db", dbPath, "testdb path")
- flag.IntVar(&openFilesCacheCapacity, "openfilescachecap", openFilesCacheCapacity, "open files cache capacity")
- flag.IntVar(&keyLen, "keylen", keyLen, "key length")
- flag.IntVar(&valueLen, "valuelen", valueLen, "value length")
- flag.Var(&numKeys, "numkeys", "num keys")
- flag.StringVar(&httpProf, "httpprof", httpProf, "http pprof listen addr")
- flag.Float64Var(&transactionProb, "transactionprob", transactionProb, "probablity of writes using transaction")
- flag.BoolVar(&enableBufferPool, "enablebufferpool", enableBufferPool, "enable buffer pool")
- flag.BoolVar(&enableBlockCache, "enableblockcache", enableBlockCache, "enable block cache")
- flag.BoolVar(&enableCompression, "enablecompression", enableCompression, "enable block compression")
- }
- func randomData(dst []byte, ns, prefix byte, i uint32, dataLen int) []byte {
- if dataLen < (2+4+4)*2+4 {
- panic("dataLen is too small")
- }
- if cap(dst) < dataLen {
- dst = make([]byte, dataLen)
- } else {
- dst = dst[:dataLen]
- }
- half := (dataLen - 4) / 2
- if _, err := rand.Reader.Read(dst[2 : half-8]); err != nil {
- panic(err)
- }
- dst[0] = ns
- dst[1] = prefix
- binary.LittleEndian.PutUint32(dst[half-8:], i)
- binary.LittleEndian.PutUint32(dst[half-8:], i)
- binary.LittleEndian.PutUint32(dst[half-4:], util.NewCRC(dst[:half-4]).Value())
- full := half * 2
- copy(dst[half:full], dst[:half])
- if full < dataLen-4 {
- if _, err := rand.Reader.Read(dst[full : dataLen-4]); err != nil {
- panic(err)
- }
- }
- binary.LittleEndian.PutUint32(dst[dataLen-4:], util.NewCRC(dst[:dataLen-4]).Value())
- return dst
- }
- func dataSplit(data []byte) (data0, data1 []byte) {
- n := (len(data) - 4) / 2
- return data[:n], data[n : n+n]
- }
- func dataNS(data []byte) byte {
- return data[0]
- }
- func dataPrefix(data []byte) byte {
- return data[1]
- }
- func dataI(data []byte) uint32 {
- return binary.LittleEndian.Uint32(data[(len(data)-4)/2-8:])
- }
- func dataChecksum(data []byte) (uint32, uint32) {
- checksum0 := binary.LittleEndian.Uint32(data[len(data)-4:])
- checksum1 := util.NewCRC(data[:len(data)-4]).Value()
- return checksum0, checksum1
- }
- func dataPrefixSlice(ns, prefix byte) *util.Range {
- return util.BytesPrefix([]byte{ns, prefix})
- }
- func dataNsSlice(ns byte) *util.Range {
- return util.BytesPrefix([]byte{ns})
- }
- type testingStorage struct {
- storage.Storage
- }
- func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool) {
- r, err := ts.Open(fd)
- if err != nil {
- log.Fatal(err)
- }
- defer r.Close()
- size, err := r.Seek(0, os.SEEK_END)
- if err != nil {
- log.Fatal(err)
- }
- o := &opt.Options{
- DisableLargeBatchTransaction: true,
- Strict: opt.NoStrict,
- }
- if checksum {
- o.Strict = opt.StrictBlockChecksum | opt.StrictReader
- }
- tr, err := table.NewReader(r, size, fd, nil, bpool, o)
- if err != nil {
- log.Fatal(err)
- }
- defer tr.Release()
- checkData := func(i int, t string, data []byte) bool {
- if len(data) == 0 {
- panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fd, i, t))
- }
- checksum0, checksum1 := dataChecksum(data)
- if checksum0 != checksum1 {
- atomic.StoreUint32(&fail, 1)
- atomic.StoreUint32(&done, 1)
- corrupted = true
- data0, data1 := dataSplit(data)
- data0c0, data0c1 := dataChecksum(data0)
- data1c0, data1c1 := dataChecksum(data1)
- log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)",
- fd, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1)
- return true
- }
- return false
- }
- iter := tr.NewIterator(nil, nil)
- defer iter.Release()
- for i := 0; iter.Next(); i++ {
- ukey, _, kt, kerr := parseIkey(iter.Key())
- if kerr != nil {
- atomic.StoreUint32(&fail, 1)
- atomic.StoreUint32(&done, 1)
- corrupted = true
- log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fd, i, kerr)
- return
- }
- if checkData(i, "key", ukey) {
- return
- }
- if kt == ktVal && checkData(i, "value", iter.Value()) {
- return
- }
- }
- if err := iter.Error(); err != nil {
- if errors.IsCorrupted(err) {
- atomic.StoreUint32(&fail, 1)
- atomic.StoreUint32(&done, 1)
- corrupted = true
- log.Printf("FATAL: [%v] Corruption detected: %v", fd, err)
- } else {
- log.Fatal(err)
- }
- }
- return
- }
- func (ts *testingStorage) Remove(fd storage.FileDesc) error {
- if atomic.LoadUint32(&fail) == 1 {
- return nil
- }
- if fd.Type == storage.TypeTable {
- if ts.scanTable(fd, true) {
- return nil
- }
- }
- return ts.Storage.Remove(fd)
- }
- type latencyStats struct {
- mark time.Time
- dur, min, max time.Duration
- num int
- }
- func (s *latencyStats) start() {
- s.mark = time.Now()
- }
- func (s *latencyStats) record(n int) {
- if s.mark.IsZero() {
- panic("not started")
- }
- dur := time.Now().Sub(s.mark)
- dur1 := dur / time.Duration(n)
- if dur1 < s.min || s.min == 0 {
- s.min = dur1
- }
- if dur1 > s.max {
- s.max = dur1
- }
- s.dur += dur
- s.num += n
- s.mark = time.Time{}
- }
- func (s *latencyStats) ratePerSec() int {
- durSec := s.dur / time.Second
- if durSec > 0 {
- return s.num / int(durSec)
- }
- return s.num
- }
- func (s *latencyStats) avg() time.Duration {
- if s.num > 0 {
- return s.dur / time.Duration(s.num)
- }
- return 0
- }
- func (s *latencyStats) add(x *latencyStats) {
- if x.min < s.min || s.min == 0 {
- s.min = x.min
- }
- if x.max > s.max {
- s.max = x.max
- }
- s.dur += x.dur
- s.num += x.num
- }
- func main() {
- flag.Parse()
- if enableBufferPool {
- bpool = util.NewBufferPool(opt.DefaultBlockSize + 128)
- }
- log.Printf("Test DB stored at %q", dbPath)
- if httpProf != "" {
- log.Printf("HTTP pprof listening at %q", httpProf)
- runtime.SetBlockProfileRate(1)
- go func() {
- if err := http.ListenAndServe(httpProf, nil); err != nil {
- log.Fatalf("HTTPPROF: %v", err)
- }
- }()
- }
- runtime.GOMAXPROCS(runtime.NumCPU())
- os.RemoveAll(dbPath)
- stor, err := storage.OpenFile(dbPath, false)
- if err != nil {
- log.Fatal(err)
- }
- tstor := &testingStorage{stor}
- defer tstor.Close()
- fatalf := func(err error, format string, v ...interface{}) {
- atomic.StoreUint32(&fail, 1)
- atomic.StoreUint32(&done, 1)
- log.Printf("FATAL: "+format, v...)
- if err != nil && errors.IsCorrupted(err) {
- cerr := err.(*errors.ErrCorrupted)
- if !cerr.Fd.Zero() && cerr.Fd.Type == storage.TypeTable {
- log.Print("FATAL: corruption detected, scanning...")
- if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) {
- log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd)
- }
- }
- }
- runtime.Goexit()
- }
- if openFilesCacheCapacity == 0 {
- openFilesCacheCapacity = -1
- }
- o := &opt.Options{
- OpenFilesCacheCapacity: openFilesCacheCapacity,
- DisableBufferPool: !enableBufferPool,
- DisableBlockCache: !enableBlockCache,
- ErrorIfExist: true,
- Compression: opt.NoCompression,
- }
- if enableCompression {
- o.Compression = opt.DefaultCompression
- }
- db, err := leveldb.Open(tstor, o)
- if err != nil {
- log.Fatal(err)
- }
- defer db.Close()
- var (
- mu = &sync.Mutex{}
- gGetStat = &latencyStats{}
- gIterStat = &latencyStats{}
- gWriteStat = &latencyStats{}
- gTrasactionStat = &latencyStats{}
- startTime = time.Now()
- writeReq = make(chan *leveldb.Batch)
- writeAck = make(chan error)
- writeAckAck = make(chan struct{})
- )
- go func() {
- for b := range writeReq {
- var err error
- if mrand.Float64() < transactionProb {
- log.Print("> Write using transaction")
- gTrasactionStat.start()
- var tr *leveldb.Transaction
- if tr, err = db.OpenTransaction(); err == nil {
- if err = tr.Write(b, nil); err == nil {
- if err = tr.Commit(); err == nil {
- gTrasactionStat.record(b.Len())
- }
- } else {
- tr.Discard()
- }
- }
- } else {
- gWriteStat.start()
- if err = db.Write(b, nil); err == nil {
- gWriteStat.record(b.Len())
- }
- }
- writeAck <- err
- <-writeAckAck
- }
- }()
- go func() {
- for {
- time.Sleep(3 * time.Second)
- log.Print("------------------------")
- log.Printf("> Elapsed=%v", time.Now().Sub(startTime))
- mu.Lock()
- log.Printf("> GetLatencyMin=%v GetLatencyMax=%v GetLatencyAvg=%v GetRatePerSec=%d",
- gGetStat.min, gGetStat.max, gGetStat.avg(), gGetStat.ratePerSec())
- log.Printf("> IterLatencyMin=%v IterLatencyMax=%v IterLatencyAvg=%v IterRatePerSec=%d",
- gIterStat.min, gIterStat.max, gIterStat.avg(), gIterStat.ratePerSec())
- log.Printf("> WriteLatencyMin=%v WriteLatencyMax=%v WriteLatencyAvg=%v WriteRatePerSec=%d",
- gWriteStat.min, gWriteStat.max, gWriteStat.avg(), gWriteStat.ratePerSec())
- log.Printf("> TransactionLatencyMin=%v TransactionLatencyMax=%v TransactionLatencyAvg=%v TransactionRatePerSec=%d",
- gTrasactionStat.min, gTrasactionStat.max, gTrasactionStat.avg(), gTrasactionStat.ratePerSec())
- mu.Unlock()
- cachedblock, _ := db.GetProperty("leveldb.cachedblock")
- openedtables, _ := db.GetProperty("leveldb.openedtables")
- alivesnaps, _ := db.GetProperty("leveldb.alivesnaps")
- aliveiters, _ := db.GetProperty("leveldb.aliveiters")
- blockpool, _ := db.GetProperty("leveldb.blockpool")
- log.Printf("> BlockCache=%s OpenedTables=%s AliveSnaps=%s AliveIter=%s BlockPool=%q",
- cachedblock, openedtables, alivesnaps, aliveiters, blockpool)
- log.Print("------------------------")
- }
- }()
- for ns, numKey := range numKeys {
- func(ns, numKey int) {
- log.Printf("[%02d] STARTING: numKey=%d", ns, numKey)
- keys := make([][]byte, numKey)
- for i := range keys {
- keys[i] = randomData(nil, byte(ns), 1, uint32(i), keyLen)
- }
- wg.Add(1)
- go func() {
- var wi uint32
- defer func() {
- log.Printf("[%02d] WRITER DONE #%d", ns, wi)
- wg.Done()
- }()
- var (
- b = new(leveldb.Batch)
- k2, v2 []byte
- nReader int32
- )
- for atomic.LoadUint32(&done) == 0 {
- log.Printf("[%02d] WRITER #%d", ns, wi)
- b.Reset()
- for _, k1 := range keys {
- k2 = randomData(k2, byte(ns), 2, wi, keyLen)
- v2 = randomData(v2, byte(ns), 3, wi, valueLen)
- b.Put(k2, v2)
- b.Put(k1, k2)
- }
- writeReq <- b
- if err := <-writeAck; err != nil {
- writeAckAck <- struct{}{}
- fatalf(err, "[%02d] WRITER #%d db.Write: %v", ns, wi, err)
- }
- snap, err := db.GetSnapshot()
- if err != nil {
- writeAckAck <- struct{}{}
- fatalf(err, "[%02d] WRITER #%d db.GetSnapshot: %v", ns, wi, err)
- }
- writeAckAck <- struct{}{}
- wg.Add(1)
- atomic.AddInt32(&nReader, 1)
- go func(snapwi uint32, snap *leveldb.Snapshot) {
- var (
- ri int
- iterStat = &latencyStats{}
- getStat = &latencyStats{}
- )
- defer func() {
- mu.Lock()
- gGetStat.add(getStat)
- gIterStat.add(iterStat)
- mu.Unlock()
- atomic.AddInt32(&nReader, -1)
- log.Printf("[%02d] READER #%d.%d DONE Snap=%v Alive=%d IterLatency=%v GetLatency=%v", ns, snapwi, ri, snap, atomic.LoadInt32(&nReader), iterStat.avg(), getStat.avg())
- snap.Release()
- wg.Done()
- }()
- stopi := snapwi + 3
- for (ri < 3 || atomic.LoadUint32(&wi) < stopi) && atomic.LoadUint32(&done) == 0 {
- var n int
- iter := snap.NewIterator(dataPrefixSlice(byte(ns), 1), nil)
- iterStat.start()
- for iter.Next() {
- k1 := iter.Key()
- k2 := iter.Value()
- iterStat.record(1)
- if dataNS(k2) != byte(ns) {
- fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key NS: want=%d got=%d", ns, snapwi, ri, n, ns, dataNS(k2))
- }
- kwritei := dataI(k2)
- if kwritei != snapwi {
- fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key iter num: %d", ns, snapwi, ri, n, kwritei)
- }
- getStat.start()
- v2, err := snap.Get(k2, nil)
- if err != nil {
- fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
- }
- getStat.record(1)
- if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 {
- err := &errors.ErrCorrupted{Fd: storage.FileDesc{0xff, 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)}
- fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
- }
- n++
- iterStat.start()
- }
- iter.Release()
- if err := iter.Error(); err != nil {
- fatalf(err, "[%02d] READER #%d.%d K%d iter.Error: %v", ns, snapwi, ri, numKey, err)
- }
- if n != numKey {
- fatalf(nil, "[%02d] READER #%d.%d missing keys: want=%d got=%d", ns, snapwi, ri, numKey, n)
- }
- ri++
- }
- }(wi, snap)
- atomic.AddUint32(&wi, 1)
- }
- }()
- delB := new(leveldb.Batch)
- wg.Add(1)
- go func() {
- var (
- i int
- iterStat = &latencyStats{}
- )
- defer func() {
- log.Printf("[%02d] SCANNER DONE #%d", ns, i)
- wg.Done()
- }()
- time.Sleep(2 * time.Second)
- for atomic.LoadUint32(&done) == 0 {
- var n int
- delB.Reset()
- iter := db.NewIterator(dataNsSlice(byte(ns)), nil)
- iterStat.start()
- for iter.Next() && atomic.LoadUint32(&done) == 0 {
- k := iter.Key()
- v := iter.Value()
- iterStat.record(1)
- for ci, x := range [...][]byte{k, v} {
- checksum0, checksum1 := dataChecksum(x)
- if checksum0 != checksum1 {
- if ci == 0 {
- fatalf(nil, "[%02d] SCANNER %d.%d invalid key checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
- } else {
- fatalf(nil, "[%02d] SCANNER %d.%d invalid value checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
- }
- }
- }
- if dataPrefix(k) == 2 || mrand.Int()%999 == 0 {
- delB.Delete(k)
- }
- n++
- iterStat.start()
- }
- iter.Release()
- if err := iter.Error(); err != nil {
- fatalf(err, "[%02d] SCANNER #%d.%d iter.Error: %v", ns, i, n, err)
- }
- if n > 0 {
- log.Printf("[%02d] SCANNER #%d IterLatency=%v", ns, i, iterStat.avg())
- }
- if delB.Len() > 0 && atomic.LoadUint32(&done) == 0 {
- t := time.Now()
- writeReq <- delB
- if err := <-writeAck; err != nil {
- writeAckAck <- struct{}{}
- fatalf(err, "[%02d] SCANNER #%d db.Write: %v", ns, i, err)
- } else {
- writeAckAck <- struct{}{}
- }
- log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Now().Sub(t))
- }
- i++
- }
- }()
- }(ns, numKey)
- }
- go func() {
- sig := make(chan os.Signal)
- signal.Notify(sig, os.Interrupt, os.Kill)
- log.Printf("Got signal: %v, exiting...", <-sig)
- atomic.StoreUint32(&done, 1)
- }()
- wg.Wait()
- }
|