concurrency_test.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. // +build ignore // this is a really tedious test for an old issue
  7. package db_test
  8. import (
  9. "crypto/rand"
  10. "log"
  11. "os"
  12. "testing"
  13. "time"
  14. "github.com/syncthing/syncthing/lib/sync"
  15. "github.com/syndtr/goleveldb/leveldb"
  16. "github.com/syndtr/goleveldb/leveldb/opt"
  17. "github.com/syndtr/goleveldb/leveldb/util"
  18. )
  19. var keys [][]byte
  20. func init() {
  21. for i := 0; i < nItems; i++ {
  22. keys = append(keys, randomData(1))
  23. }
  24. }
  25. const nItems = 10000
  26. func randomData(prefix byte) []byte {
  27. data := make([]byte, 1+32+64+32)
  28. _, err := rand.Reader.Read(data)
  29. if err != nil {
  30. panic(err)
  31. }
  32. return append([]byte{prefix}, data...)
  33. }
  34. func setItems(db *leveldb.DB) error {
  35. batch := new(leveldb.Batch)
  36. for _, k1 := range keys {
  37. k2 := randomData(2)
  38. // k2 -> data
  39. batch.Put(k2, randomData(42))
  40. // k1 -> k2
  41. batch.Put(k1, k2)
  42. }
  43. if testing.Verbose() {
  44. log.Printf("batch write (set) %p", batch)
  45. }
  46. return db.Write(batch, nil)
  47. }
  48. func clearItems(db *leveldb.DB) error {
  49. snap, err := db.GetSnapshot()
  50. if err != nil {
  51. return err
  52. }
  53. defer snap.Release()
  54. // Iterate over k2
  55. it := snap.NewIterator(util.BytesPrefix([]byte{1}), nil)
  56. defer it.Release()
  57. batch := new(leveldb.Batch)
  58. for it.Next() {
  59. k1 := it.Key()
  60. k2 := it.Value()
  61. // k2 should exist
  62. _, err := snap.Get(k2, nil)
  63. if err != nil {
  64. return err
  65. }
  66. // Delete the k1 => k2 mapping first
  67. batch.Delete(k1)
  68. // Then the k2 => data mapping
  69. batch.Delete(k2)
  70. }
  71. if testing.Verbose() {
  72. log.Printf("batch write (clear) %p", batch)
  73. }
  74. return db.Write(batch, nil)
  75. }
  76. func scanItems(db *leveldb.DB) error {
  77. snap, err := db.GetSnapshot()
  78. if testing.Verbose() {
  79. log.Printf("snap create %p", snap)
  80. }
  81. if err != nil {
  82. return err
  83. }
  84. defer func() {
  85. if testing.Verbose() {
  86. log.Printf("snap release %p", snap)
  87. }
  88. snap.Release()
  89. }()
  90. // Iterate from the start of k2 space to the end
  91. it := snap.NewIterator(util.BytesPrefix([]byte{1}), nil)
  92. defer it.Release()
  93. i := 0
  94. for it.Next() {
  95. // k2 => k1 => data
  96. k1 := it.Key()
  97. k2 := it.Value()
  98. _, err := snap.Get(k2, nil)
  99. if err != nil {
  100. log.Printf("k1: %x", k1)
  101. log.Printf("k2: %x (missing)", k2)
  102. return err
  103. }
  104. i++
  105. }
  106. if testing.Verbose() {
  107. log.Println("scanned", i)
  108. }
  109. return nil
  110. }
  111. func TestConcurrentSetClear(t *testing.T) {
  112. if testing.Short() {
  113. return
  114. }
  115. dur := 30 * time.Second
  116. t0 := time.Now()
  117. wg := sync.NewWaitGroup()
  118. os.RemoveAll("testdata/concurrent-set-clear.db")
  119. db, err := leveldb.OpenFile("testdata/concurrent-set-clear.db", &opt.Options{OpenFilesCacheCapacity: 10})
  120. if err != nil {
  121. t.Fatal(err)
  122. }
  123. defer os.RemoveAll("testdata/concurrent-set-clear.db")
  124. errChan := make(chan error, 3)
  125. wg.Add(1)
  126. go func() {
  127. defer wg.Done()
  128. for time.Since(t0) < dur {
  129. if err := setItems(db); err != nil {
  130. errChan <- err
  131. return
  132. }
  133. if err := clearItems(db); err != nil {
  134. errChan <- err
  135. return
  136. }
  137. }
  138. }()
  139. wg.Add(1)
  140. go func() {
  141. defer wg.Done()
  142. for time.Since(t0) < dur {
  143. if err := scanItems(db); err != nil {
  144. errChan <- err
  145. return
  146. }
  147. }
  148. }()
  149. go func() {
  150. wg.Wait()
  151. errChan <- nil
  152. }()
  153. err = <-errChan
  154. if err != nil {
  155. t.Error(err)
  156. }
  157. db.Close()
  158. }
  159. func TestConcurrentSetOnly(t *testing.T) {
  160. if testing.Short() {
  161. return
  162. }
  163. dur := 30 * time.Second
  164. t0 := time.Now()
  165. wg := sync.NewWaitGroup()
  166. os.RemoveAll("testdata/concurrent-set-only.db")
  167. db, err := leveldb.OpenFile("testdata/concurrent-set-only.db", &opt.Options{OpenFilesCacheCapacity: 10})
  168. if err != nil {
  169. t.Fatal(err)
  170. }
  171. defer os.RemoveAll("testdata/concurrent-set-only.db")
  172. errChan := make(chan error, 3)
  173. wg.Add(1)
  174. go func() {
  175. defer wg.Done()
  176. for time.Since(t0) < dur {
  177. if err := setItems(db); err != nil {
  178. errChan <- err
  179. return
  180. }
  181. }
  182. }()
  183. wg.Add(1)
  184. go func() {
  185. defer wg.Done()
  186. for time.Since(t0) < dur {
  187. if err := scanItems(db); err != nil {
  188. errChan <- err
  189. return
  190. }
  191. }
  192. }()
  193. go func() {
  194. wg.Wait()
  195. errChan <- nil
  196. }()
  197. err = <-errChan
  198. if err != nil {
  199. t.Error(err)
  200. }
  201. }