db_write.go 7.8 KB


  1. // Copyright (c) 2012, Suryandaru Triandana <[email protected]>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "time"
  9. "github.com/syndtr/goleveldb/leveldb/memdb"
  10. "github.com/syndtr/goleveldb/leveldb/opt"
  11. "github.com/syndtr/goleveldb/leveldb/util"
  12. )
  13. func (db *DB) writeJournal(b *Batch) error {
  14. w, err := db.journal.Next()
  15. if err != nil {
  16. return err
  17. }
  18. if _, err := w.Write(b.encode()); err != nil {
  19. return err
  20. }
  21. if err := db.journal.Flush(); err != nil {
  22. return err
  23. }
  24. if b.sync {
  25. return db.journalWriter.Sync()
  26. }
  27. return nil
  28. }
  29. func (db *DB) jWriter() {
  30. defer db.closeW.Done()
  31. for {
  32. select {
  33. case b := <-db.journalC:
  34. if b != nil {
  35. db.journalAckC <- db.writeJournal(b)
  36. }
  37. case _, _ = <-db.closeC:
  38. return
  39. }
  40. }
  41. }
  42. func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
  43. // Wait for pending memdb compaction.
  44. err = db.compTriggerWait(db.mcompCmdC)
  45. if err != nil {
  46. return
  47. }
  48. // Create new memdb and journal.
  49. mem, err = db.newMem(n)
  50. if err != nil {
  51. return
  52. }
  53. // Schedule memdb compaction.
  54. if wait {
  55. err = db.compTriggerWait(db.mcompCmdC)
  56. } else {
  57. db.compTrigger(db.mcompCmdC)
  58. }
  59. return
  60. }
  61. func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
  62. delayed := false
  63. flush := func() (retry bool) {
  64. v := db.s.version()
  65. defer v.release()
  66. mdb = db.getEffectiveMem()
  67. if mdb == nil {
  68. err = ErrClosed
  69. return false
  70. }
  71. defer func() {
  72. if retry {
  73. mdb.decref()
  74. mdb = nil
  75. }
  76. }()
  77. mdbFree = mdb.Free()
  78. switch {
  79. case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
  80. delayed = true
  81. time.Sleep(time.Millisecond)
  82. case mdbFree >= n:
  83. return false
  84. case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
  85. delayed = true
  86. err = db.compTriggerWait(db.tcompCmdC)
  87. if err != nil {
  88. return false
  89. }
  90. default:
  91. // Allow memdb to grow if it has no entry.
  92. if mdb.Len() == 0 {
  93. mdbFree = n
  94. } else {
  95. mdb.decref()
  96. mdb, err = db.rotateMem(n, false)
  97. if err == nil {
  98. mdbFree = mdb.Free()
  99. } else {
  100. mdbFree = 0
  101. }
  102. }
  103. return false
  104. }
  105. return true
  106. }
  107. start := time.Now()
  108. for flush() {
  109. }
  110. if delayed {
  111. db.writeDelay += time.Since(start)
  112. db.writeDelayN++
  113. } else if db.writeDelayN > 0 {
  114. db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
  115. db.writeDelay = 0
  116. db.writeDelayN = 0
  117. }
  118. return
  119. }
  120. // Write apply the given batch to the DB. The batch will be applied
  121. // sequentially.
  122. //
  123. // It is safe to modify the contents of the arguments after Write returns.
  124. func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
  125. err = db.ok()
  126. if err != nil || b == nil || b.Len() == 0 {
  127. return
  128. }
  129. b.init(wo.GetSync() && !db.s.o.GetNoSync())
  130. if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
  131. // Writes using transaction.
  132. tr, err1 := db.OpenTransaction()
  133. if err1 != nil {
  134. return err1
  135. }
  136. if err1 := tr.Write(b, wo); err1 != nil {
  137. tr.Discard()
  138. return err1
  139. }
  140. return tr.Commit()
  141. }
  142. // The write happen synchronously.
  143. select {
  144. case db.writeC <- b:
  145. if <-db.writeMergedC {
  146. return <-db.writeAckC
  147. }
  148. // Continue, the write lock already acquired by previous writer
  149. // and handed out to us.
  150. case db.writeLockC <- struct{}{}:
  151. case err = <-db.compPerErrC:
  152. return
  153. case _, _ = <-db.closeC:
  154. return ErrClosed
  155. }
  156. merged := 0
  157. danglingMerge := false
  158. defer func() {
  159. for i := 0; i < merged; i++ {
  160. db.writeAckC <- err
  161. }
  162. if danglingMerge {
  163. // Only one dangling merge at most, so this is safe.
  164. db.writeMergedC <- false
  165. } else {
  166. <-db.writeLockC
  167. }
  168. }()
  169. mdb, mdbFree, err := db.flush(b.size())
  170. if err != nil {
  171. return
  172. }
  173. defer mdb.decref()
  174. // Calculate maximum size of the batch.
  175. m := 1 << 20
  176. if x := b.size(); x <= 128<<10 {
  177. m = x + (128 << 10)
  178. }
  179. m = minInt(m, mdbFree)
  180. // Merge with other batch.
  181. drain:
  182. for b.size() < m && !b.sync {
  183. select {
  184. case nb := <-db.writeC:
  185. if b.size()+nb.size() <= m {
  186. b.append(nb)
  187. db.writeMergedC <- true
  188. merged++
  189. } else {
  190. danglingMerge = true
  191. break drain
  192. }
  193. default:
  194. break drain
  195. }
  196. }
  197. // Set batch first seq number relative from last seq.
  198. b.seq = db.seq + 1
  199. // Write journal concurrently if it is large enough.
  200. if b.size() >= (128 << 10) {
  201. // Push the write batch to the journal writer
  202. select {
  203. case db.journalC <- b:
  204. // Write into memdb
  205. if berr := b.memReplay(mdb.DB); berr != nil {
  206. panic(berr)
  207. }
  208. case err = <-db.compPerErrC:
  209. return
  210. case _, _ = <-db.closeC:
  211. err = ErrClosed
  212. return
  213. }
  214. // Wait for journal writer
  215. select {
  216. case err = <-db.journalAckC:
  217. if err != nil {
  218. // Revert memdb if error detected
  219. if berr := b.revertMemReplay(mdb.DB); berr != nil {
  220. panic(berr)
  221. }
  222. return
  223. }
  224. case _, _ = <-db.closeC:
  225. err = ErrClosed
  226. return
  227. }
  228. } else {
  229. err = db.writeJournal(b)
  230. if err != nil {
  231. return
  232. }
  233. if berr := b.memReplay(mdb.DB); berr != nil {
  234. panic(berr)
  235. }
  236. }
  237. // Set last seq number.
  238. db.addSeq(uint64(b.Len()))
  239. if b.size() >= mdbFree {
  240. db.rotateMem(0, false)
  241. }
  242. return
  243. }
  244. // Put sets the value for the given key. It overwrites any previous value
  245. // for that key; a DB is not a multi-map.
  246. //
  247. // It is safe to modify the contents of the arguments after Put returns.
  248. func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
  249. b := new(Batch)
  250. b.Put(key, value)
  251. return db.Write(b, wo)
  252. }
  253. // Delete deletes the value for the given key.
  254. //
  255. // It is safe to modify the contents of the arguments after Delete returns.
  256. func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
  257. b := new(Batch)
  258. b.Delete(key)
  259. return db.Write(b, wo)
  260. }
  261. func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
  262. iter := mem.NewIterator(nil)
  263. defer iter.Release()
  264. return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
  265. (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
  266. }
  267. // CompactRange compacts the underlying DB for the given key range.
  268. // In particular, deleted and overwritten versions are discarded,
  269. // and the data is rearranged to reduce the cost of operations
  270. // needed to access the data. This operation should typically only
  271. // be invoked by users who understand the underlying implementation.
  272. //
  273. // A nil Range.Start is treated as a key before all keys in the DB.
  274. // And a nil Range.Limit is treated as a key after all keys in the DB.
  275. // Therefore if both is nil then it will compact entire DB.
  276. func (db *DB) CompactRange(r util.Range) error {
  277. if err := db.ok(); err != nil {
  278. return err
  279. }
  280. // Lock writer.
  281. select {
  282. case db.writeLockC <- struct{}{}:
  283. case err := <-db.compPerErrC:
  284. return err
  285. case _, _ = <-db.closeC:
  286. return ErrClosed
  287. }
  288. // Check for overlaps in memdb.
  289. mdb := db.getEffectiveMem()
  290. if mdb == nil {
  291. return ErrClosed
  292. }
  293. defer mdb.decref()
  294. if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
  295. // Memdb compaction.
  296. if _, err := db.rotateMem(0, false); err != nil {
  297. <-db.writeLockC
  298. return err
  299. }
  300. <-db.writeLockC
  301. if err := db.compTriggerWait(db.mcompCmdC); err != nil {
  302. return err
  303. }
  304. } else {
  305. <-db.writeLockC
  306. }
  307. // Table compaction.
  308. return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
  309. }
  310. // SetReadOnly makes DB read-only. It will stay read-only until reopened.
  311. func (db *DB) SetReadOnly() error {
  312. if err := db.ok(); err != nil {
  313. return err
  314. }
  315. // Lock writer.
  316. select {
  317. case db.writeLockC <- struct{}{}:
  318. db.compWriteLocking = true
  319. case err := <-db.compPerErrC:
  320. return err
  321. case _, _ = <-db.closeC:
  322. return ErrClosed
  323. }
  324. // Set compaction read-only.
  325. select {
  326. case db.compErrSetC <- ErrReadOnly:
  327. case perr := <-db.compPerErrC:
  328. return perr
  329. case _, _ = <-db.closeC:
  330. return ErrClosed
  331. }
  332. return nil
  333. }