session.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. // Copyright (c) 2012, Suryandaru Triandana <[email protected]>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "fmt"
  9. "io"
  10. "os"
  11. "sync"
  12. "github.com/syndtr/goleveldb/leveldb/errors"
  13. "github.com/syndtr/goleveldb/leveldb/journal"
  14. "github.com/syndtr/goleveldb/leveldb/opt"
  15. "github.com/syndtr/goleveldb/leveldb/storage"
  16. )
  17. // ErrManifestCorrupted records manifest corruption.
  18. type ErrManifestCorrupted struct {
  19. Field string
  20. Reason string
  21. }
  22. func (e *ErrManifestCorrupted) Error() string {
  23. return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
  24. }
  25. func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error {
  26. return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason})
  27. }
  28. // session represent a persistent database session.
  29. type session struct {
  30. // Need 64-bit alignment.
  31. stNextFileNum int64 // current unused file number
  32. stJournalNum int64 // current journal file number; need external synchronization
  33. stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb
  34. stTempFileNum int64
  35. stSeqNum uint64 // last mem compacted seq; need external synchronization
  36. stor storage.Storage
  37. storLock storage.Lock
  38. o *cachedOptions
  39. icmp *iComparer
  40. tops *tOps
  41. manifest *journal.Writer
  42. manifestWriter storage.Writer
  43. manifestFd storage.FileDesc
  44. stCompPtrs []internalKey // compaction pointers; need external synchronization
  45. stVersion *version // current version
  46. vmu sync.Mutex
  47. }
  48. // Creates new initialized session instance.
  49. func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
  50. if stor == nil {
  51. return nil, os.ErrInvalid
  52. }
  53. storLock, err := stor.Lock()
  54. if err != nil {
  55. return
  56. }
  57. s = &session{
  58. stor: stor,
  59. storLock: storLock,
  60. }
  61. s.setOptions(o)
  62. s.tops = newTableOps(s)
  63. s.setVersion(newVersion(s))
  64. s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
  65. return
  66. }
  67. // Close session.
  68. func (s *session) close() {
  69. s.tops.close()
  70. if s.manifest != nil {
  71. s.manifest.Close()
  72. }
  73. if s.manifestWriter != nil {
  74. s.manifestWriter.Close()
  75. }
  76. s.manifest = nil
  77. s.manifestWriter = nil
  78. s.setVersion(&version{s: s, closing: true})
  79. }
  80. // Release session lock.
  81. func (s *session) release() {
  82. s.storLock.Release()
  83. }
  84. // Create a new database session; need external synchronization.
  85. func (s *session) create() error {
  86. // create manifest
  87. return s.newManifest(nil, nil)
  88. }
  89. // Recover a database session; need external synchronization.
  90. func (s *session) recover() (err error) {
  91. defer func() {
  92. if os.IsNotExist(err) {
  93. // Don't return os.ErrNotExist if the underlying storage contains
  94. // other files that belong to LevelDB. So the DB won't get trashed.
  95. if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 {
  96. err = &errors.ErrCorrupted{Fd: storage.FileDesc{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}}
  97. }
  98. }
  99. }()
  100. fd, err := s.stor.GetMeta()
  101. if err != nil {
  102. return
  103. }
  104. reader, err := s.stor.Open(fd)
  105. if err != nil {
  106. return
  107. }
  108. defer reader.Close()
  109. var (
  110. // Options.
  111. strict = s.o.GetStrict(opt.StrictManifest)
  112. jr = journal.NewReader(reader, dropper{s, fd}, strict, true)
  113. rec = &sessionRecord{}
  114. staging = s.stVersion.newStaging()
  115. )
  116. for {
  117. var r io.Reader
  118. r, err = jr.Next()
  119. if err != nil {
  120. if err == io.EOF {
  121. err = nil
  122. break
  123. }
  124. return errors.SetFd(err, fd)
  125. }
  126. err = rec.decode(r)
  127. if err == nil {
  128. // save compact pointers
  129. for _, r := range rec.compPtrs {
  130. s.setCompPtr(r.level, internalKey(r.ikey))
  131. }
  132. // commit record to version staging
  133. staging.commit(rec)
  134. } else {
  135. err = errors.SetFd(err, fd)
  136. if strict || !errors.IsCorrupted(err) {
  137. return
  138. }
  139. s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd))
  140. }
  141. rec.resetCompPtrs()
  142. rec.resetAddedTables()
  143. rec.resetDeletedTables()
  144. }
  145. switch {
  146. case !rec.has(recComparer):
  147. return newErrManifestCorrupted(fd, "comparer", "missing")
  148. case rec.comparer != s.icmp.uName():
  149. return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
  150. case !rec.has(recNextFileNum):
  151. return newErrManifestCorrupted(fd, "next-file-num", "missing")
  152. case !rec.has(recJournalNum):
  153. return newErrManifestCorrupted(fd, "journal-file-num", "missing")
  154. case !rec.has(recSeqNum):
  155. return newErrManifestCorrupted(fd, "seq-num", "missing")
  156. }
  157. s.manifestFd = fd
  158. s.setVersion(staging.finish())
  159. s.setNextFileNum(rec.nextFileNum)
  160. s.recordCommited(rec)
  161. return nil
  162. }
  163. // Commit session; need external synchronization.
  164. func (s *session) commit(r *sessionRecord) (err error) {
  165. v := s.version()
  166. defer v.release()
  167. // spawn new version based on current version
  168. nv := v.spawn(r)
  169. if s.manifest == nil {
  170. // manifest journal writer not yet created, create one
  171. err = s.newManifest(r, nv)
  172. } else {
  173. err = s.flushManifest(r)
  174. }
  175. // finally, apply new version if no error rise
  176. if err == nil {
  177. s.setVersion(nv)
  178. }
  179. return
  180. }