Bläddra i källkod

Test case and goleveldb fix (fixes #740, fixes #796)

Jakob Borg 11 år sedan
förälder
incheckning
0ebee92f7d

+ 1 - 1
Godeps/Godeps.json

@@ -48,7 +48,7 @@
 		},
 		{
 			"ImportPath": "github.com/syndtr/goleveldb/leveldb",
-			"Rev": "e2fa4e6ac1cc41a73bc9fd467878ecbf65df5cc3"
+			"Rev": "0d8857b7ec571b0a6c9677d8e6c0a4ceeabd1d71"
 		},
 		{
 			"ImportPath": "github.com/vitrun/qart/coding",

+ 1 - 1
Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go

@@ -71,7 +71,7 @@ func (db *DB) releaseSnapshot(elem *snapshotElement) {
 func (db *DB) minSeq() uint64 {
 	db.snapsMu.Lock()
 	defer db.snapsMu.Unlock()
-	elem := db.snapsRoot.prev
+	elem := db.snapsRoot.next
 	if elem != &db.snapsRoot {
 		return elem.seq
 	}

+ 138 - 0
Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go

@@ -1125,6 +1125,45 @@ func TestDb_Snapshot(t *testing.T) {
 	})
 }
 
+func TestDb_SnapshotList(t *testing.T) {
+	db := &DB{}
+	db.initSnapshot()
+	e0a := db.acquireSnapshot()
+	e0b := db.acquireSnapshot()
+	db.seq = 1
+	e1 := db.acquireSnapshot()
+	db.seq = 2
+	e2 := db.acquireSnapshot()
+
+	if db.minSeq() != 0 {
+		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
+	}
+	db.releaseSnapshot(e0a)
+	if db.minSeq() != 0 {
+		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
+	}
+	db.releaseSnapshot(e2)
+	if db.minSeq() != 0 {
+		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
+	}
+	db.releaseSnapshot(e0b)
+	if db.minSeq() != 1 {
+		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
+	}
+	e2 = db.acquireSnapshot()
+	if db.minSeq() != 1 {
+		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
+	}
+	db.releaseSnapshot(e1)
+	if db.minSeq() != 2 {
+		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
+	}
+	db.releaseSnapshot(e2)
+	if db.minSeq() != 2 {
+		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
+	}
+}
+
 func TestDb_HiddenValuesAreRemoved(t *testing.T) {
 	trun(t, func(h *dbHarness) {
 		s := h.db.s
@@ -1884,3 +1923,102 @@ func TestDb_LeveldbIssue200(t *testing.T) {
 	iter.Next()
 	assertBytes(t, []byte("5"), iter.Key())
 }
+
+func TestDb_GoleveldbIssue74(t *testing.T) {
+	h := newDbHarnessWopt(t, &opt.Options{
+		WriteBuffer: 1 * opt.MiB,
+	})
+	defer h.close()
+
+	const n, dur = 10000, 5 * time.Second
+
+	runtime.GOMAXPROCS(runtime.NumCPU())
+
+	until := time.Now().Add(dur)
+	wg := new(sync.WaitGroup)
+	wg.Add(2)
+	var done uint32
+	go func() {
+		var i int
+		defer func() {
+			t.Logf("WRITER DONE #%d", i)
+			atomic.StoreUint32(&done, 1)
+			wg.Done()
+		}()
+
+		b := new(Batch)
+		for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
+			iv := fmt.Sprintf("VAL%010d", i)
+			for k := 0; k < n; k++ {
+				key := fmt.Sprintf("KEY%06d", k)
+				b.Put([]byte(key), []byte(key+iv))
+				b.Put([]byte(fmt.Sprintf("PTR%06d", k)), []byte(key))
+			}
+			h.write(b)
+
+			b.Reset()
+			snap := h.getSnapshot()
+			iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
+			var k int
+			for ; iter.Next(); k++ {
+				ptrKey := iter.Key()
+				key := iter.Value()
+
+				if _, err := snap.Get(ptrKey, nil); err != nil {
+					t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err)
+				}
+				if value, err := snap.Get(key, nil); err != nil {
+					t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, key, err)
+				} else if string(value) != string(key)+iv {
+					t.Fatalf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value)
+				}
+
+				b.Delete(key)
+				b.Delete(ptrKey)
+			}
+			h.write(b)
+			iter.Release()
+			snap.Release()
+			if k != n {
+				t.Fatalf("#%d %d != %d", i, k, n)
+			}
+		}
+		t.Logf("writer done after %d iterations", i)
+	}()
+	go func() {
+		var i int
+		defer func() {
+			t.Logf("READER DONE #%d", i)
+			atomic.StoreUint32(&done, 1)
+			wg.Done()
+		}()
+		for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
+			snap := h.getSnapshot()
+			iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
+			var prevValue string
+			var k int
+			for ; iter.Next(); k++ {
+				ptrKey := iter.Key()
+				key := iter.Value()
+
+				if _, err := snap.Get(ptrKey, nil); err != nil {
+					t.Fatalf("READER #%d snapshot.Get %q: %v", i, ptrKey, err)
+				}
+
+				if value, err := snap.Get(key, nil); err != nil {
+					t.Fatalf("READER #%d snapshot.Get %q: %v", i, key, err)
+				} else if prevValue != "" && string(value) != string(key)+prevValue {
+					t.Fatalf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value)
+				} else {
+					prevValue = string(value[len(key):])
+				}
+			}
+			iter.Release()
+			snap.Release()
+			if k > 0 && k != n {
+				t.Fatalf("#%d %d != %d", i, k, n)
+			}
+		}
+	}()
+	wg.Wait()
+}

+ 168 - 0
internal/files/concurrency_test.go

@@ -0,0 +1,168 @@
+package files_test
+
+import (
+	"crypto/rand"
+	"fmt"
+	"log"
+	"os"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/syndtr/goleveldb/leveldb"
+	"github.com/syndtr/goleveldb/leveldb/util"
+)
+
+var items map[string][]byte
+var keys map[string]string
+
+const nItems = 10000
+
+func setupMaps() {
+
+	// Set up two simple maps, one "key" => data and one "indirect key" =>
+	// "key".
+
+	items = make(map[string][]byte, nItems)
+	keys = make(map[string]string, nItems)
+
+	for i := 0; i < nItems; i++ {
+		k1 := fmt.Sprintf("key%d", i)
+		data := make([]byte, 87)
+		_, err := rand.Reader.Read(data)
+		if err != nil {
+			panic(err)
+		}
+		items[k1] = data
+
+		k2 := fmt.Sprintf("indirect%d", i)
+		keys[k2] = k1
+	}
+}
+
+func makeK1(s string) []byte {
+	k1 := make([]byte, 1+len(s))
+	k1[0] = 1
+	copy(k1[1:], []byte(s))
+	return k1
+}
+
+func makeK2(s string) []byte {
+	k2 := make([]byte, 1+len(s))
+	k2[0] = 2 // Only difference from makeK1
+	copy(k2[1:], []byte(s))
+	return k2
+}
+
+func setItems(db *leveldb.DB) error {
+	snap, err := db.GetSnapshot()
+	if err != nil {
+		return err
+	}
+	defer snap.Release()
+
+	batch := &leveldb.Batch{}
+	for k2, k1 := range keys {
+		// Create k1 => item mapping first
+		batch.Put(makeK1(k1), items[k1])
+		// Then the k2 => k1 mapping
+		batch.Put(makeK2(k2), makeK1(k1))
+	}
+	return db.Write(batch, nil)
+}
+
+func clearItems(db *leveldb.DB) error {
+	snap, err := db.GetSnapshot()
+	if err != nil {
+		return err
+	}
+	defer snap.Release()
+
+	// Iterate from the start of k2 space to the end
+	it := snap.NewIterator(&util.Range{Start: []byte{2}, Limit: []byte{2, 0xff, 0xff, 0xff, 0xff}}, nil)
+	defer it.Release()
+
+	batch := &leveldb.Batch{}
+	for it.Next() {
+		k2 := it.Key()
+		k1 := it.Value()
+
+		// k1 should exist
+		_, err := snap.Get(k1, nil)
+		if err != nil {
+			return err
+		}
+
+		// Delete the k2 => k1 mapping first
+		batch.Delete(k2)
+		// Then the k1 => key mapping
+		batch.Delete(k1)
+	}
+	return db.Write(batch, nil)
+}
+
+func scanItems(db *leveldb.DB) error {
+	snap, err := db.GetSnapshot()
+	if err != nil {
+		return err
+	}
+	defer snap.Release()
+
+	// Iterate from the start of k2 space to the end
+	it := snap.NewIterator(&util.Range{Start: []byte{2}, Limit: []byte{2, 0xff, 0xff, 0xff, 0xff}}, nil)
+	defer it.Release()
+
+	for it.Next() {
+		// k2 => k1 => data
+		k2 := it.Key()
+		k1 := it.Value()
+		_, err := snap.Get(k1, nil)
+		if err != nil {
+			log.Printf("k1: %q (%x)", k1, k1)
+			log.Printf("k2: %q (%x)", k2, k2)
+			return err
+		}
+	}
+	return nil
+}
+
+func TestConcurrent(t *testing.T) {
+	setupMaps()
+
+	dur := 2 * time.Second
+	t0 := time.Now()
+	var wg sync.WaitGroup
+
+	os.RemoveAll("testdata/global.db")
+	db, err := leveldb.OpenFile("testdata/global.db", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll("testdata/global.db")
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for time.Since(t0) < dur {
+			if err := setItems(db); err != nil {
+				t.Fatal(err)
+			}
+			if err := clearItems(db); err != nil {
+				t.Fatal(err)
+			}
+		}
+	}()
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for time.Since(t0) < dur {
+			if err := scanItems(db); err != nil {
+				t.Fatal(err)
+			}
+		}
+	}()
+
+	wg.Wait()
+	db.Close()
+}

+ 0 - 61
internal/files/set_test.go

@@ -862,64 +862,3 @@ func TestLongPath(t *testing.T) {
 			gf[0].Name, local[0].Name)
 	}
 }
-
-/*
-var gf protocol.FileInfo
-
-func TestStressGlobalVersion(t *testing.T) {
-	dur := 15 * time.Second
-	if testing.Short() {
-		dur = 1 * time.Second
-	}
-
-	set1 := []protocol.FileInfo{
-		protocol.FileInfo{Name: "a", Version: 1000},
-		protocol.FileInfo{Name: "b", Version: 1000},
-	}
-	set2 := []protocol.FileInfo{
-		protocol.FileInfo{Name: "b", Version: 1001},
-		protocol.FileInfo{Name: "c", Version: 1000},
-	}
-
-	db, err := leveldb.OpenFile("testdata/global.db", nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	m := files.NewSet("test", db)
-
-	done := make(chan struct{})
-	go stressWriter(m, remoteDevice0, set1, nil, done)
-	go stressWriter(m, protocol.LocalDeviceID, set2, nil, done)
-
-	t0 := time.Now()
-	for time.Since(t0) < dur {
-		m.WithGlobal(func(f protocol.FileInfo) bool {
-			gf = f
-			return true
-		})
-	}
-
-	close(done)
-}
-
-func stressWriter(s *files.Set, id protocol.DeviceID, set1, set2 []protocol.FileInfo, done chan struct{}) {
-	one := true
-	i := 0
-	for {
-		select {
-		case <-done:
-			return
-
-		default:
-			if one {
-				s.Replace(id, set1)
-			} else {
-				s.Replace(id, set2)
-			}
-			one = !one
-		}
-		i++
-	}
-}
-*/