db_state.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. // Copyright (c) 2013, Suryandaru Triandana <[email protected]>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "sync/atomic"
  9. "time"
  10. "github.com/syndtr/goleveldb/leveldb/journal"
  11. "github.com/syndtr/goleveldb/leveldb/memdb"
  12. "github.com/syndtr/goleveldb/leveldb/storage"
  13. )
  14. type memDB struct {
  15. db *DB
  16. *memdb.DB
  17. ref int32
  18. }
  19. func (m *memDB) getref() int32 {
  20. return atomic.LoadInt32(&m.ref)
  21. }
  22. func (m *memDB) incref() {
  23. atomic.AddInt32(&m.ref, 1)
  24. }
  25. func (m *memDB) decref() {
  26. if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
  27. // Only put back memdb with std capacity.
  28. if m.Capacity() == m.db.s.o.GetWriteBuffer() {
  29. m.Reset()
  30. m.db.mpoolPut(m.DB)
  31. }
  32. m.db = nil
  33. m.DB = nil
  34. } else if ref < 0 {
  35. panic("negative memdb ref")
  36. }
  37. }
  38. // Get latest sequence number.
  39. func (db *DB) getSeq() uint64 {
  40. return atomic.LoadUint64(&db.seq)
  41. }
  42. // Atomically adds delta to seq.
  43. func (db *DB) addSeq(delta uint64) {
  44. atomic.AddUint64(&db.seq, delta)
  45. }
  46. func (db *DB) setSeq(seq uint64) {
  47. atomic.StoreUint64(&db.seq, seq)
  48. }
  49. func (db *DB) sampleSeek(ikey internalKey) {
  50. v := db.s.version()
  51. if v.sampleSeek(ikey) {
  52. // Trigger table compaction.
  53. db.compTrigger(db.tcompCmdC)
  54. }
  55. v.release()
  56. }
  57. func (db *DB) mpoolPut(mem *memdb.DB) {
  58. if !db.isClosed() {
  59. select {
  60. case db.memPool <- mem:
  61. default:
  62. }
  63. }
  64. }
  65. func (db *DB) mpoolGet(n int) *memDB {
  66. var mdb *memdb.DB
  67. select {
  68. case mdb = <-db.memPool:
  69. default:
  70. }
  71. if mdb == nil || mdb.Capacity() < n {
  72. mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
  73. }
  74. return &memDB{
  75. db: db,
  76. DB: mdb,
  77. }
  78. }
  79. func (db *DB) mpoolDrain() {
  80. ticker := time.NewTicker(30 * time.Second)
  81. for {
  82. select {
  83. case <-ticker.C:
  84. select {
  85. case <-db.memPool:
  86. default:
  87. }
  88. case _, _ = <-db.closeC:
  89. ticker.Stop()
  90. // Make sure the pool is drained.
  91. select {
  92. case <-db.memPool:
  93. case <-time.After(time.Second):
  94. }
  95. close(db.memPool)
  96. return
  97. }
  98. }
  99. }
  100. // Create new memdb and froze the old one; need external synchronization.
  101. // newMem only called synchronously by the writer.
  102. func (db *DB) newMem(n int) (mem *memDB, err error) {
  103. fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()}
  104. w, err := db.s.stor.Create(fd)
  105. if err != nil {
  106. db.s.reuseFileNum(fd.Num)
  107. return
  108. }
  109. db.memMu.Lock()
  110. defer db.memMu.Unlock()
  111. if db.frozenMem != nil {
  112. panic("still has frozen mem")
  113. }
  114. if db.journal == nil {
  115. db.journal = journal.NewWriter(w)
  116. } else {
  117. db.journal.Reset(w)
  118. db.journalWriter.Close()
  119. db.frozenJournalFd = db.journalFd
  120. }
  121. db.journalWriter = w
  122. db.journalFd = fd
  123. db.frozenMem = db.mem
  124. mem = db.mpoolGet(n)
  125. mem.incref() // for self
  126. mem.incref() // for caller
  127. db.mem = mem
  128. // The seq only incremented by the writer. And whoever called newMem
  129. // should hold write lock, so no need additional synchronization here.
  130. db.frozenSeq = db.seq
  131. return
  132. }
  133. // Get all memdbs.
  134. func (db *DB) getMems() (e, f *memDB) {
  135. db.memMu.RLock()
  136. defer db.memMu.RUnlock()
  137. if db.mem != nil {
  138. db.mem.incref()
  139. } else if !db.isClosed() {
  140. panic("nil effective mem")
  141. }
  142. if db.frozenMem != nil {
  143. db.frozenMem.incref()
  144. }
  145. return db.mem, db.frozenMem
  146. }
  147. // Get frozen memdb.
  148. func (db *DB) getEffectiveMem() *memDB {
  149. db.memMu.RLock()
  150. defer db.memMu.RUnlock()
  151. if db.mem != nil {
  152. db.mem.incref()
  153. } else if !db.isClosed() {
  154. panic("nil effective mem")
  155. }
  156. return db.mem
  157. }
  158. // Check whether we has frozen memdb.
  159. func (db *DB) hasFrozenMem() bool {
  160. db.memMu.RLock()
  161. defer db.memMu.RUnlock()
  162. return db.frozenMem != nil
  163. }
  164. // Get frozen memdb.
  165. func (db *DB) getFrozenMem() *memDB {
  166. db.memMu.RLock()
  167. defer db.memMu.RUnlock()
  168. if db.frozenMem != nil {
  169. db.frozenMem.incref()
  170. }
  171. return db.frozenMem
  172. }
  173. // Drop frozen memdb; assume that frozen memdb isn't nil.
  174. func (db *DB) dropFrozenMem() {
  175. db.memMu.Lock()
  176. if err := db.s.stor.Remove(db.frozenJournalFd); err != nil {
  177. db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err)
  178. } else {
  179. db.logf("journal@remove removed @%d", db.frozenJournalFd.Num)
  180. }
  181. db.frozenJournalFd = storage.FileDesc{}
  182. db.frozenMem.decref()
  183. db.frozenMem = nil
  184. db.memMu.Unlock()
  185. }
  186. // Clear mems ptr; used by DB.Close().
  187. func (db *DB) clearMems() {
  188. db.memMu.Lock()
  189. db.mem = nil
  190. db.frozenMem = nil
  191. db.memMu.Unlock()
  192. }
  193. // Set closed flag; return true if not already closed.
  194. func (db *DB) setClosed() bool {
  195. return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
  196. }
  197. // Check whether DB was closed.
  198. func (db *DB) isClosed() bool {
  199. return atomic.LoadUint32(&db.closed) != 0
  200. }
  201. // Check read ok status.
  202. func (db *DB) ok() error {
  203. if db.isClosed() {
  204. return ErrClosed
  205. }
  206. return nil
  207. }