2pc.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. // Copyright 2014 The lldb Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Two Phase Commit & Structural ACID
  5. package lldb
  6. import (
  7. "bufio"
  8. "encoding/binary"
  9. "fmt"
  10. "io"
  11. "os"
  12. "github.com/cznic/fileutil"
  13. "github.com/cznic/mathutil"
  14. )
  15. var _ Filer = &ACIDFiler0{} // Ensure ACIDFiler0 is a Filer
  16. type acidWrite struct {
  17. b []byte
  18. off int64
  19. }
  20. type acidWriter0 ACIDFiler0
  21. func (a *acidWriter0) WriteAt(b []byte, off int64) (n int, err error) {
  22. f := (*ACIDFiler0)(a)
  23. if f.newEpoch {
  24. f.newEpoch = false
  25. f.data = f.data[:0]
  26. if err = a.writePacket([]interface{}{wpt00Header, walTypeACIDFiler0, ""}); err != nil {
  27. return
  28. }
  29. }
  30. if err = a.writePacket([]interface{}{wpt00WriteData, b, off}); err != nil {
  31. return
  32. }
  33. f.data = append(f.data, acidWrite{b, off})
  34. return len(b), nil
  35. }
  36. func (a *acidWriter0) writePacket(items []interface{}) (err error) {
  37. f := (*ACIDFiler0)(a)
  38. b, err := EncodeScalars(items...)
  39. if err != nil {
  40. return
  41. }
  42. var b4 [4]byte
  43. binary.BigEndian.PutUint32(b4[:], uint32(len(b)))
  44. if _, err = f.bwal.Write(b4[:]); err != nil {
  45. return
  46. }
  47. if _, err = f.bwal.Write(b); err != nil {
  48. return
  49. }
  50. if m := (4 + len(b)) % 16; m != 0 {
  51. var pad [15]byte
  52. _, err = f.bwal.Write(pad[:16-m])
  53. }
  54. return
  55. }
  56. // WAL Packet Tags
  57. const (
  58. wpt00Header = iota
  59. wpt00WriteData
  60. wpt00Checkpoint
  61. wpt00Empty
  62. )
  63. const (
  64. walTypeACIDFiler0 = iota
  65. )
  66. // ACIDFiler0 is a very simple, synchronous implementation of 2PC. It uses a
  67. // single write ahead log file to provide the structural atomicity
  68. // (BeginUpdate/EndUpdate/Rollback) and durability (DB can be recovered from
  69. // WAL if a crash occurred).
  70. //
  71. // ACIDFiler0 is a Filer.
  72. //
  73. // NOTE: Durable synchronous 2PC involves three fsyncs in this implementation
  74. // (WAL, DB, zero truncated WAL). Where possible, it's recommended to collect
  75. // transactions for, say one second before performing the two phase commit as
  76. // the typical performance for rotational hard disks is about few tens of
  77. // fsyncs per second atmost. For an example of such collective transaction
  78. // approach please see the colecting FSM STT in Dbm's documentation[1].
  79. //
  80. // [1]: http://godoc.org/github.com/cznic/exp/dbm
  81. type ACIDFiler0 struct {
  82. *RollbackFiler
  83. bwal *bufio.Writer
  84. data []acidWrite
  85. newEpoch bool
  86. peakWal int64 // tracks WAL maximum used size
  87. testHook bool // keeps WAL untruncated (once)
  88. wal *os.File
  89. walOptions walOptions
  90. }
  91. type walOptions struct {
  92. headroom int64 // Minimum WAL size.
  93. }
  94. // WALOption amends WAL properties.
  95. type WALOption func(*walOptions) error
  96. // MinWAL sets the minimum size a WAL file will have. The "extra" allocated
  97. // file space serves as a headroom. Commits that fit into the headroom should
  98. // not fail due to 'not enough space on the volume' errors.
  99. //
  100. // The min parameter is first rounded-up to a non negative multiple of the size
  101. // of the Allocator atom.
  102. //
  103. // Note: Setting minimum WAL size may render the DB non-recoverable when a
  104. // crash occurs and the DB is opened in an earlier version of LLDB that does
  105. // not support minimum WAL sizes.
  106. func MinWAL(min int64) WALOption {
  107. min = mathutil.MaxInt64(0, min)
  108. if r := min % 16; r != 0 {
  109. min += 16 - r
  110. }
  111. return func(o *walOptions) error {
  112. o.headroom = min
  113. return nil
  114. }
  115. }
  116. // NewACIDFiler0 returns a newly created ACIDFiler0 with WAL in wal.
  117. //
  118. // If the WAL is zero sized then a previous clean shutdown of db is taken for
  119. // granted and no recovery procedure is taken.
  120. //
  121. // If the WAL is of non zero size then it is checked for having a
  122. // committed/fully finished transaction not yet been reflected in db. If such
  123. // transaction exists it's committed to db. If the recovery process finishes
  124. // successfully, the WAL is truncated to the minimum WAL size and fsync'ed
  125. // prior to return from NewACIDFiler0.
  126. //
  127. // opts allow to amend WAL properties.
  128. func NewACIDFiler(db Filer, wal *os.File, opts ...WALOption) (r *ACIDFiler0, err error) {
  129. fi, err := wal.Stat()
  130. if err != nil {
  131. return
  132. }
  133. r = &ACIDFiler0{wal: wal}
  134. for _, o := range opts {
  135. if err := o(&r.walOptions); err != nil {
  136. return nil, err
  137. }
  138. }
  139. if fi.Size() != 0 {
  140. if err = r.recoverDb(db); err != nil {
  141. return
  142. }
  143. }
  144. r.bwal = bufio.NewWriter(r.wal)
  145. r.newEpoch = true
  146. acidWriter := (*acidWriter0)(r)
  147. if r.RollbackFiler, err = NewRollbackFiler(
  148. db,
  149. func(sz int64) (err error) {
  150. // Checkpoint
  151. if err = acidWriter.writePacket([]interface{}{wpt00Checkpoint, sz}); err != nil {
  152. return
  153. }
  154. if err = r.bwal.Flush(); err != nil {
  155. return
  156. }
  157. if err = r.wal.Sync(); err != nil {
  158. return
  159. }
  160. var wfi os.FileInfo
  161. if wfi, err = r.wal.Stat(); err != nil {
  162. return
  163. }
  164. r.peakWal = mathutil.MaxInt64(wfi.Size(), r.peakWal)
  165. // Phase 1 commit complete
  166. for _, v := range r.data {
  167. n := len(v.b)
  168. if m := v.off + int64(n); m > sz {
  169. if n -= int(m - sz); n <= 0 {
  170. continue
  171. }
  172. }
  173. if _, err = db.WriteAt(v.b[:n], v.off); err != nil {
  174. return err
  175. }
  176. }
  177. if err = db.Truncate(sz); err != nil {
  178. return
  179. }
  180. if err = db.Sync(); err != nil {
  181. return
  182. }
  183. // Phase 2 commit complete
  184. if !r.testHook {
  185. if err := r.emptyWAL(); err != nil {
  186. return err
  187. }
  188. }
  189. r.testHook = false
  190. r.bwal.Reset(r.wal)
  191. r.newEpoch = true
  192. return r.wal.Sync()
  193. },
  194. acidWriter,
  195. ); err != nil {
  196. return
  197. }
  198. return r, nil
  199. }
  200. func (a *ACIDFiler0) emptyWAL() error {
  201. if err := a.wal.Truncate(a.walOptions.headroom); err != nil {
  202. return err
  203. }
  204. if _, err := a.wal.Seek(0, 0); err != nil {
  205. return err
  206. }
  207. if a.walOptions.headroom != 0 {
  208. a.bwal.Reset(a.wal)
  209. if err := (*acidWriter0)(a).writePacket([]interface{}{wpt00Empty}); err != nil {
  210. return err
  211. }
  212. if err := a.bwal.Flush(); err != nil {
  213. return err
  214. }
  215. if _, err := a.wal.Seek(0, 0); err != nil {
  216. return err
  217. }
  218. }
  219. return nil
  220. }
  221. // PeakWALSize reports the maximum size WAL has ever used.
  222. func (a ACIDFiler0) PeakWALSize() int64 {
  223. return a.peakWal
  224. }
  225. func (a *ACIDFiler0) readPacket(f *bufio.Reader) (items []interface{}, err error) {
  226. var b4 [4]byte
  227. n, err := io.ReadAtLeast(f, b4[:], 4)
  228. if n != 4 {
  229. return
  230. }
  231. ln := int(binary.BigEndian.Uint32(b4[:]))
  232. m := (4 + ln) % 16
  233. padd := (16 - m) % 16
  234. b := make([]byte, ln+padd)
  235. if n, err = io.ReadAtLeast(f, b, len(b)); n != len(b) {
  236. return
  237. }
  238. return DecodeScalars(b[:ln])
  239. }
  240. func (a *ACIDFiler0) recoverDb(db Filer) (err error) {
  241. fi, err := a.wal.Stat()
  242. if err != nil {
  243. return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: err}
  244. }
  245. if sz := fi.Size(); sz%16 != 0 {
  246. return &ErrILSEQ{Type: ErrFileSize, Name: a.wal.Name(), Arg: sz}
  247. }
  248. f := bufio.NewReader(a.wal)
  249. items, err := a.readPacket(f)
  250. if err != nil {
  251. return
  252. }
  253. if items[0] == int64(wpt00Empty) {
  254. if len(items) != 1 {
  255. return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("invalid packet items %#v", items)}
  256. }
  257. return nil
  258. }
  259. if len(items) != 3 || items[0] != int64(wpt00Header) || items[1] != int64(walTypeACIDFiler0) {
  260. return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("invalid packet items %#v", items)}
  261. }
  262. tr := NewBTree(nil)
  263. for {
  264. items, err = a.readPacket(f)
  265. if err != nil {
  266. return
  267. }
  268. if len(items) < 2 {
  269. return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("too few packet items %#v", items)}
  270. }
  271. switch items[0] {
  272. case int64(wpt00WriteData):
  273. if len(items) != 3 {
  274. return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("invalid data packet items %#v", items)}
  275. }
  276. b, off := items[1].([]byte), items[2].(int64)
  277. var key [8]byte
  278. binary.BigEndian.PutUint64(key[:], uint64(off))
  279. if err = tr.Set(key[:], b); err != nil {
  280. return
  281. }
  282. case int64(wpt00Checkpoint):
  283. var b1 [1]byte
  284. if n, err := f.Read(b1[:]); n != 0 || err == nil {
  285. return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("checkpoint n %d, err %v", n, err)}
  286. }
  287. if len(items) != 2 {
  288. return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("checkpoint packet invalid items %#v", items)}
  289. }
  290. sz := items[1].(int64)
  291. enum, err := tr.seekFirst()
  292. if err != nil {
  293. return err
  294. }
  295. for {
  296. var k, v []byte
  297. k, v, err = enum.current()
  298. if err != nil {
  299. if fileutil.IsEOF(err) {
  300. break
  301. }
  302. return err
  303. }
  304. if _, err = db.WriteAt(v, int64(binary.BigEndian.Uint64(k))); err != nil {
  305. return err
  306. }
  307. if err = enum.next(); err != nil {
  308. if fileutil.IsEOF(err) {
  309. break
  310. }
  311. return err
  312. }
  313. }
  314. if err = db.Truncate(sz); err != nil {
  315. return err
  316. }
  317. if err = db.Sync(); err != nil {
  318. return err
  319. }
  320. // Recovery complete
  321. if err := a.emptyWAL(); err != nil {
  322. return err
  323. }
  324. return a.wal.Sync()
  325. default:
  326. return &ErrILSEQ{Type: ErrInvalidWAL, Name: a.wal.Name(), More: fmt.Sprintf("packet tag %v", items[0])}
  327. }
  328. }
  329. }