concurrency_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package files_test
  2. import (
  3. "crypto/rand"
  4. "fmt"
  5. "log"
  6. "os"
  7. "sync"
  8. "testing"
  9. "time"
  10. "github.com/syndtr/goleveldb/leveldb"
  11. "github.com/syndtr/goleveldb/leveldb/util"
  12. )
  13. var items map[string][]byte
  14. var keys map[string]string
  15. const nItems = 10000
  16. func setupMaps() {
  17. // Set up two simple maps, one "key" => data and one "indirect key" =>
  18. // "key".
  19. items = make(map[string][]byte, nItems)
  20. keys = make(map[string]string, nItems)
  21. for i := 0; i < nItems; i++ {
  22. k1 := fmt.Sprintf("key%d", i)
  23. data := make([]byte, 87)
  24. _, err := rand.Reader.Read(data)
  25. if err != nil {
  26. panic(err)
  27. }
  28. items[k1] = data
  29. k2 := fmt.Sprintf("indirect%d", i)
  30. keys[k2] = k1
  31. }
  32. }
  33. func makeK1(s string) []byte {
  34. k1 := make([]byte, 1+len(s))
  35. k1[0] = 1
  36. copy(k1[1:], []byte(s))
  37. return k1
  38. }
  39. func makeK2(s string) []byte {
  40. k2 := make([]byte, 1+len(s))
  41. k2[0] = 2 // Only difference from makeK1
  42. copy(k2[1:], []byte(s))
  43. return k2
  44. }
  45. func setItems(db *leveldb.DB) error {
  46. snap, err := db.GetSnapshot()
  47. if err != nil {
  48. return err
  49. }
  50. defer snap.Release()
  51. batch := &leveldb.Batch{}
  52. for k2, k1 := range keys {
  53. // Create k1 => item mapping first
  54. batch.Put(makeK1(k1), items[k1])
  55. // Then the k2 => k1 mapping
  56. batch.Put(makeK2(k2), makeK1(k1))
  57. }
  58. return db.Write(batch, nil)
  59. }
  60. func clearItems(db *leveldb.DB) error {
  61. snap, err := db.GetSnapshot()
  62. if err != nil {
  63. return err
  64. }
  65. defer snap.Release()
  66. // Iterate from the start of k2 space to the end
  67. it := snap.NewIterator(&util.Range{Start: []byte{2}, Limit: []byte{2, 0xff, 0xff, 0xff, 0xff}}, nil)
  68. defer it.Release()
  69. batch := &leveldb.Batch{}
  70. for it.Next() {
  71. k2 := it.Key()
  72. k1 := it.Value()
  73. // k1 should exist
  74. _, err := snap.Get(k1, nil)
  75. if err != nil {
  76. return err
  77. }
  78. // Delete the k2 => k1 mapping first
  79. batch.Delete(k2)
  80. // Then the k1 => key mapping
  81. batch.Delete(k1)
  82. }
  83. return db.Write(batch, nil)
  84. }
  85. func scanItems(db *leveldb.DB) error {
  86. snap, err := db.GetSnapshot()
  87. if err != nil {
  88. return err
  89. }
  90. defer snap.Release()
  91. // Iterate from the start of k2 space to the end
  92. it := snap.NewIterator(&util.Range{Start: []byte{2}, Limit: []byte{2, 0xff, 0xff, 0xff, 0xff}}, nil)
  93. defer it.Release()
  94. for it.Next() {
  95. // k2 => k1 => data
  96. k2 := it.Key()
  97. k1 := it.Value()
  98. _, err := snap.Get(k1, nil)
  99. if err != nil {
  100. log.Printf("k1: %q (%x)", k1, k1)
  101. log.Printf("k2: %q (%x)", k2, k2)
  102. return err
  103. }
  104. }
  105. return nil
  106. }
  107. func TestConcurrent(t *testing.T) {
  108. setupMaps()
  109. dur := 2 * time.Second
  110. t0 := time.Now()
  111. var wg sync.WaitGroup
  112. os.RemoveAll("testdata/global.db")
  113. db, err := leveldb.OpenFile("testdata/global.db", nil)
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. defer os.RemoveAll("testdata/global.db")
  118. wg.Add(1)
  119. go func() {
  120. defer wg.Done()
  121. for time.Since(t0) < dur {
  122. if err := setItems(db); err != nil {
  123. t.Fatal(err)
  124. }
  125. if err := clearItems(db); err != nil {
  126. t.Fatal(err)
  127. }
  128. }
  129. }()
  130. wg.Add(1)
  131. go func() {
  132. defer wg.Done()
  133. for time.Since(t0) < dur {
  134. if err := scanItems(db); err != nil {
  135. t.Fatal(err)
  136. }
  137. }
  138. }()
  139. wg.Wait()
  140. db.Close()
  141. }