file.go 24 KB


  1. // Copyright 2014 The ql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Well known handles
  5. // 1: root
  6. // 2: id
  7. package ql
  8. import (
  9. "crypto/sha1"
  10. "fmt"
  11. "io"
  12. "io/ioutil"
  13. "math/big"
  14. "os"
  15. "path/filepath"
  16. "sync"
  17. "time"
  18. "github.com/cznic/lldb"
  19. "github.com/cznic/mathutil"
  20. "github.com/cznic/ql/vendored/github.com/camlistore/go4/lock"
  21. )
  22. const (
  23. magic = "\x60\xdbql"
  24. )
  25. var (
  26. _ btreeIndex = (*fileIndex)(nil)
  27. _ btreeIterator = (*fileBTreeIterator)(nil)
  28. _ indexIterator = (*fileIndexIterator)(nil)
  29. _ storage = (*file)(nil)
  30. _ temp = (*fileTemp)(nil)
  31. )
  32. type chunk struct { // expanded to blob types lazily
  33. f *file
  34. b []byte
  35. }
  36. func (c chunk) expand() (v interface{}, err error) {
  37. return c.f.loadChunks(c.b)
  38. }
  39. func expand1(data interface{}, e error) (v interface{}, err error) {
  40. if e != nil {
  41. return nil, e
  42. }
  43. c, ok := data.(chunk)
  44. if !ok {
  45. return data, nil
  46. }
  47. return c.expand()
  48. }
  49. func expand(data []interface{}) (err error) {
  50. for i, v := range data {
  51. if data[i], err = expand1(v, nil); err != nil {
  52. return
  53. }
  54. }
  55. return
  56. }
  57. // OpenFile returns a DB backed by a named file. The back end limits the size
  58. // of a record to about 64 kB.
  59. func OpenFile(name string, opt *Options) (db *DB, err error) {
  60. var f lldb.OSFile
  61. if f = opt.OSFile; f == nil {
  62. f, err = os.OpenFile(name, os.O_RDWR, 0666)
  63. if err != nil {
  64. if !os.IsNotExist(err) {
  65. return nil, err
  66. }
  67. if !opt.CanCreate {
  68. return nil, err
  69. }
  70. f, err = os.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0666)
  71. if err != nil {
  72. return nil, err
  73. }
  74. }
  75. }
  76. fi, err := newFileFromOSFile(f) // always ACID
  77. if err != nil {
  78. return
  79. }
  80. if fi.tempFile = opt.TempFile; fi.tempFile == nil {
  81. fi.tempFile = func(dir, prefix string) (f lldb.OSFile, err error) {
  82. f0, err := ioutil.TempFile(dir, prefix)
  83. return f0, err
  84. }
  85. }
  86. return newDB(fi)
  87. }
  88. // Options amend the behavior of OpenFile.
  89. //
  90. // CanCreate
  91. //
  92. // The CanCreate option enables OpenFile to create the DB file if it does not
  93. // exists.
  94. //
  95. // OSFile
  96. //
  97. // OSFile allows to pass an os.File like back end providing, for example,
  98. // encrypted storage. If this field is nil then OpenFile uses the file named by
  99. // the 'name' parameter instead.
  100. //
  101. // TempFile
  102. //
  103. // TempFile provides a temporary file used for evaluating the GROUP BY, ORDER
  104. // BY, ... clauses. The hook is intended to be used by encrypted DB back ends
  105. // to avoid leaks of unecrypted data to such temp files by providing temp files
  106. // which are encrypted as well. Note that *os.File satisfies the lldb.OSFile
  107. // interface.
  108. //
  109. // If TempFile is nil it defaults to ioutil.TempFile.
  110. type Options struct {
  111. CanCreate bool
  112. OSFile lldb.OSFile
  113. TempFile func(dir, prefix string) (f lldb.OSFile, err error)
  114. }
  115. type fileBTreeIterator struct {
  116. en *lldb.BTreeEnumerator
  117. t *fileTemp
  118. }
  119. func (it *fileBTreeIterator) Next() (k, v []interface{}, err error) {
  120. bk, bv, err := it.en.Next()
  121. if err != nil {
  122. return
  123. }
  124. if k, err = lldb.DecodeScalars(bk); err != nil {
  125. return
  126. }
  127. for i, val := range k {
  128. b, ok := val.([]byte)
  129. if !ok {
  130. continue
  131. }
  132. c := chunk{it.t.file, b}
  133. if k[i], err = c.expand(); err != nil {
  134. return nil, nil, err
  135. }
  136. }
  137. if err = enforce(k, it.t.colsK); err != nil {
  138. return
  139. }
  140. if v, err = lldb.DecodeScalars(bv); err != nil {
  141. return
  142. }
  143. for i, val := range v {
  144. b, ok := val.([]byte)
  145. if !ok {
  146. continue
  147. }
  148. c := chunk{it.t.file, b}
  149. if v[i], err = c.expand(); err != nil {
  150. return nil, nil, err
  151. }
  152. }
  153. err = enforce(v, it.t.colsV)
  154. return
  155. }
  156. func enforce(val []interface{}, cols []*col) (err error) {
  157. for i, v := range val {
  158. if val[i], err = convert(v, cols[i].typ); err != nil {
  159. return
  160. }
  161. }
  162. return
  163. }
  164. //NTYPE
  165. func infer(from []interface{}, to *[]*col) {
  166. if len(*to) == 0 {
  167. *to = make([]*col, len(from))
  168. for i := range *to {
  169. (*to)[i] = &col{}
  170. }
  171. }
  172. for i, c := range *to {
  173. if f := from[i]; f != nil {
  174. switch x := f.(type) {
  175. //case nil:
  176. case idealComplex:
  177. c.typ = qComplex128
  178. from[i] = complex128(x)
  179. case idealFloat:
  180. c.typ = qFloat64
  181. from[i] = float64(x)
  182. case idealInt:
  183. c.typ = qInt64
  184. from[i] = int64(x)
  185. case idealRune:
  186. c.typ = qInt32
  187. from[i] = int32(x)
  188. case idealUint:
  189. c.typ = qUint64
  190. from[i] = uint64(x)
  191. case bool:
  192. c.typ = qBool
  193. case complex128:
  194. c.typ = qComplex128
  195. case complex64:
  196. c.typ = qComplex64
  197. case float64:
  198. c.typ = qFloat64
  199. case float32:
  200. c.typ = qFloat32
  201. case int8:
  202. c.typ = qInt8
  203. case int16:
  204. c.typ = qInt16
  205. case int32:
  206. c.typ = qInt32
  207. case int64:
  208. c.typ = qInt64
  209. case string:
  210. c.typ = qString
  211. case uint8:
  212. c.typ = qUint8
  213. case uint16:
  214. c.typ = qUint16
  215. case uint32:
  216. c.typ = qUint32
  217. case uint64:
  218. c.typ = qUint64
  219. case []byte:
  220. c.typ = qBlob
  221. case *big.Int:
  222. c.typ = qBigInt
  223. case *big.Rat:
  224. c.typ = qBigRat
  225. case time.Time:
  226. c.typ = qTime
  227. case time.Duration:
  228. c.typ = qDuration
  229. case chunk:
  230. vals, err := lldb.DecodeScalars([]byte(x.b))
  231. if err != nil {
  232. panic(err)
  233. }
  234. if len(vals) == 0 {
  235. panic("internal error 040")
  236. }
  237. i, ok := vals[0].(int64)
  238. if !ok {
  239. panic("internal error 041")
  240. }
  241. c.typ = int(i)
  242. case map[string]interface{}: // map of ids of a cross join
  243. default:
  244. panic("internal error 042")
  245. }
  246. }
  247. }
  248. }
  249. type fileTemp struct {
  250. *file
  251. colsK []*col
  252. colsV []*col
  253. t *lldb.BTree
  254. }
  255. func (t *fileTemp) BeginTransaction() error {
  256. return nil
  257. }
  258. func (t *fileTemp) Get(k []interface{}) (v []interface{}, err error) {
  259. if err = expand(k); err != nil {
  260. return
  261. }
  262. if err = t.flatten(k); err != nil {
  263. return nil, err
  264. }
  265. bk, err := lldb.EncodeScalars(k...)
  266. if err != nil {
  267. return
  268. }
  269. bv, err := t.t.Get(nil, bk)
  270. if err != nil {
  271. return
  272. }
  273. return lldb.DecodeScalars(bv)
  274. }
  275. func (t *fileTemp) Drop() (err error) {
  276. if t.f0 == nil {
  277. return
  278. }
  279. fn := t.f0.Name()
  280. if err = t.f0.Close(); err != nil {
  281. return
  282. }
  283. if fn == "" {
  284. return
  285. }
  286. return os.Remove(fn)
  287. }
  288. func (t *fileTemp) SeekFirst() (it btreeIterator, err error) {
  289. en, err := t.t.SeekFirst()
  290. if err != nil {
  291. return
  292. }
  293. return &fileBTreeIterator{t: t, en: en}, nil
  294. }
  295. func (t *fileTemp) Set(k, v []interface{}) (err error) {
  296. if err = expand(k); err != nil {
  297. return
  298. }
  299. if err = expand(v); err != nil {
  300. return
  301. }
  302. infer(k, &t.colsK)
  303. infer(v, &t.colsV)
  304. if err = t.flatten(k); err != nil {
  305. return
  306. }
  307. bk, err := lldb.EncodeScalars(k...)
  308. if err != nil {
  309. return
  310. }
  311. if err = t.flatten(v); err != nil {
  312. return
  313. }
  314. bv, err := lldb.EncodeScalars(v...)
  315. if err != nil {
  316. return
  317. }
  318. return t.t.Set(bk, bv)
  319. }
  320. type file struct {
  321. a *lldb.Allocator
  322. codec *gobCoder
  323. f lldb.Filer
  324. f0 lldb.OSFile
  325. id int64
  326. lck io.Closer
  327. mu sync.Mutex
  328. name string
  329. tempFile func(dir, prefix string) (f lldb.OSFile, err error)
  330. wal *os.File
  331. }
  332. func newFileFromOSFile(f lldb.OSFile) (fi *file, err error) {
  333. nm := lockName(f.Name())
  334. lck, err := lock.Lock(nm)
  335. if err != nil {
  336. if lck != nil {
  337. lck.Close()
  338. }
  339. return nil, err
  340. }
  341. close := true
  342. defer func() {
  343. if close && lck != nil {
  344. lck.Close()
  345. }
  346. }()
  347. var w *os.File
  348. closew := false
  349. wn := walName(f.Name())
  350. w, err = os.OpenFile(wn, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0666)
  351. closew = true
  352. defer func() {
  353. if w != nil && closew {
  354. nm := w.Name()
  355. w.Close()
  356. os.Remove(nm)
  357. w = nil
  358. }
  359. }()
  360. if err != nil {
  361. if !os.IsExist(err) {
  362. return nil, err
  363. }
  364. closew = false
  365. w, err = os.OpenFile(wn, os.O_RDWR, 0666)
  366. if err != nil {
  367. return nil, err
  368. }
  369. closew = true
  370. st, err := w.Stat()
  371. if err != nil {
  372. return nil, err
  373. }
  374. if st.Size() != 0 {
  375. return nil, fmt.Errorf("(file-001) non empty WAL file %s exists", wn)
  376. }
  377. }
  378. info, err := f.Stat()
  379. if err != nil {
  380. return nil, err
  381. }
  382. switch sz := info.Size(); {
  383. case sz == 0:
  384. b := make([]byte, 16)
  385. copy(b, []byte(magic))
  386. if _, err := f.Write(b); err != nil {
  387. return nil, err
  388. }
  389. filer := lldb.Filer(lldb.NewOSFiler(f))
  390. filer = lldb.NewInnerFiler(filer, 16)
  391. if filer, err = lldb.NewACIDFiler(filer, w); err != nil {
  392. return nil, err
  393. }
  394. a, err := lldb.NewAllocator(filer, &lldb.Options{})
  395. if err != nil {
  396. return nil, err
  397. }
  398. a.Compress = true
  399. s := &file{
  400. a: a,
  401. codec: newGobCoder(),
  402. f0: f,
  403. f: filer,
  404. lck: lck,
  405. name: f.Name(),
  406. wal: w,
  407. }
  408. if err = s.BeginTransaction(); err != nil {
  409. return nil, err
  410. }
  411. h, err := s.Create()
  412. if err != nil {
  413. return nil, err
  414. }
  415. if h != 1 { // root
  416. panic("internal error 043")
  417. }
  418. if h, err = s.a.Alloc(make([]byte, 8)); err != nil {
  419. return nil, err
  420. }
  421. if h != 2 { // id
  422. panic("internal error 044")
  423. }
  424. close, closew = false, false
  425. return s, s.Commit()
  426. default:
  427. b := make([]byte, 16)
  428. if _, err := f.Read(b); err != nil {
  429. return nil, err
  430. }
  431. if string(b[:len(magic)]) != magic {
  432. return nil, fmt.Errorf("(file-002) unknown file format")
  433. }
  434. filer := lldb.Filer(lldb.NewOSFiler(f))
  435. filer = lldb.NewInnerFiler(filer, 16)
  436. if filer, err = lldb.NewACIDFiler(filer, w); err != nil {
  437. return nil, err
  438. }
  439. a, err := lldb.NewAllocator(filer, &lldb.Options{})
  440. if err != nil {
  441. return nil, err
  442. }
  443. bid, err := a.Get(nil, 2) // id
  444. if err != nil {
  445. return nil, err
  446. }
  447. if len(bid) != 8 {
  448. return nil, fmt.Errorf("(file-003) corrupted DB: id |% x|", bid)
  449. }
  450. id := int64(0)
  451. for _, v := range bid {
  452. id = (id << 8) | int64(v)
  453. }
  454. a.Compress = true
  455. s := &file{
  456. a: a,
  457. codec: newGobCoder(),
  458. f0: f,
  459. f: filer,
  460. id: id,
  461. lck: lck,
  462. name: f.Name(),
  463. wal: w,
  464. }
  465. close, closew = false, false
  466. return s, nil
  467. }
  468. }
  469. func (s *file) OpenIndex(unique bool, handle int64) (btreeIndex, error) {
  470. t, err := lldb.OpenBTree(s.a, s.collate, handle)
  471. if err != nil {
  472. return nil, err
  473. }
  474. return &fileIndex{s, handle, t, unique, newGobCoder()}, nil
  475. }
  476. func (s *file) CreateIndex(unique bool) ( /* handle */ int64, btreeIndex, error) {
  477. t, h, err := lldb.CreateBTree(s.a, s.collate)
  478. if err != nil {
  479. return -1, nil, err
  480. }
  481. return h, &fileIndex{s, h, t, unique, newGobCoder()}, nil
  482. }
  483. func (s *file) Acid() bool { return s.wal != nil }
  484. func errSet(p *error, errs ...error) (err error) {
  485. err = *p
  486. for _, e := range errs {
  487. if err != nil {
  488. return
  489. }
  490. *p, err = e, e
  491. }
  492. return
  493. }
  494. func (s *file) lock() func() {
  495. s.mu.Lock()
  496. return s.mu.Unlock
  497. }
  498. func (s *file) Close() (err error) {
  499. defer s.lock()()
  500. es := s.f0.Sync()
  501. ef := s.f0.Close()
  502. var ew error
  503. if s.wal != nil {
  504. ew = s.wal.Close()
  505. }
  506. el := s.lck.Close()
  507. return errSet(&err, es, ef, ew, el)
  508. }
  509. func (s *file) Name() string { return s.name }
  510. func (s *file) Verify() (allocs int64, err error) {
  511. defer s.lock()()
  512. var stat lldb.AllocStats
  513. if err = s.a.Verify(lldb.NewMemFiler(), nil, &stat); err != nil {
  514. return
  515. }
  516. allocs = stat.AllocAtoms
  517. return
  518. }
  519. func (s *file) expandBytes(d []interface{}) (err error) {
  520. for i, v := range d {
  521. b, ok := v.([]byte)
  522. if !ok {
  523. continue
  524. }
  525. d[i], err = s.loadChunks(b)
  526. if err != nil {
  527. return
  528. }
  529. }
  530. return
  531. }
  532. func (s *file) collate(a, b []byte) int { //TODO w/ error return
  533. da, err := lldb.DecodeScalars(a)
  534. if err != nil {
  535. panic(err)
  536. }
  537. if err = s.expandBytes(da); err != nil {
  538. panic(err)
  539. }
  540. db, err := lldb.DecodeScalars(b)
  541. if err != nil {
  542. panic(err)
  543. }
  544. if err = s.expandBytes(db); err != nil {
  545. panic(err)
  546. }
  547. //dbg("da: %v, db: %v", da, db)
  548. return collate(da, db)
  549. }
  550. func (s *file) CreateTemp(asc bool) (bt temp, err error) {
  551. f, err := s.tempFile("", "ql-tmp-")
  552. if err != nil {
  553. return nil, err
  554. }
  555. fn := f.Name()
  556. filer := lldb.NewOSFiler(f)
  557. a, err := lldb.NewAllocator(filer, &lldb.Options{})
  558. if err != nil {
  559. f.Close()
  560. os.Remove(fn)
  561. return nil, err
  562. }
  563. k := 1
  564. if !asc {
  565. k = -1
  566. }
  567. t, _, err := lldb.CreateBTree(a, func(a, b []byte) int { //TODO w/ error return
  568. return k * s.collate(a, b)
  569. })
  570. if err != nil {
  571. f.Close()
  572. if fn != "" {
  573. os.Remove(fn)
  574. }
  575. return nil, err
  576. }
  577. x := &fileTemp{file: &file{
  578. a: a,
  579. codec: newGobCoder(),
  580. f0: f,
  581. },
  582. t: t}
  583. return x, nil
  584. }
  585. func (s *file) BeginTransaction() (err error) {
  586. defer s.lock()()
  587. return s.f.BeginUpdate()
  588. }
  589. func (s *file) Rollback() (err error) {
  590. defer s.lock()()
  591. return s.f.Rollback()
  592. }
  593. func (s *file) Commit() (err error) {
  594. defer s.lock()()
  595. return s.f.EndUpdate()
  596. }
  597. func (s *file) Create(data ...interface{}) (h int64, err error) {
  598. if err = expand(data); err != nil {
  599. return
  600. }
  601. if err = s.flatten(data); err != nil {
  602. return
  603. }
  604. b, err := lldb.EncodeScalars(data...)
  605. if err != nil {
  606. return
  607. }
  608. defer s.lock()()
  609. return s.a.Alloc(b)
  610. }
  611. func (s *file) Delete(h int64, blobCols ...*col) (err error) {
  612. switch len(blobCols) {
  613. case 0:
  614. defer s.lock()()
  615. return s.a.Free(h)
  616. default:
  617. return s.free(h, blobCols)
  618. }
  619. }
  620. func (s *file) ResetID() (err error) {
  621. s.id = 0
  622. return
  623. }
  624. func (s *file) ID() (int64, error) {
  625. defer s.lock()()
  626. s.id++
  627. b := make([]byte, 8)
  628. id := s.id
  629. for i := 7; i >= 0; i-- {
  630. b[i] = byte(id)
  631. id >>= 8
  632. }
  633. return s.id, s.a.Realloc(2, b)
  634. }
  635. func (s *file) free(h int64, blobCols []*col) (err error) {
  636. b, err := s.a.Get(nil, h) //LATER +bufs
  637. if err != nil {
  638. return
  639. }
  640. rec, err := lldb.DecodeScalars(b)
  641. if err != nil {
  642. return
  643. }
  644. for _, col := range blobCols {
  645. if col.index >= len(rec) {
  646. return fmt.Errorf("(file-004) file.free: corrupted DB (record len)")
  647. }
  648. if col.index+2 >= len(rec) {
  649. continue
  650. }
  651. switch x := rec[col.index+2].(type) {
  652. case nil:
  653. // nop
  654. case []byte:
  655. if err = s.freeChunks(x); err != nil {
  656. return
  657. }
  658. }
  659. }
  660. defer s.lock()()
  661. return s.a.Free(h)
  662. }
  663. func (s *file) Read(dst []interface{}, h int64, cols ...*col) (data []interface{}, err error) { //NTYPE
  664. b, err := s.a.Get(nil, h) //LATER +bufs
  665. if err != nil {
  666. return
  667. }
  668. rec, err := lldb.DecodeScalars(b)
  669. if err != nil {
  670. return
  671. }
  672. for _, col := range cols {
  673. i := col.index + 2
  674. if i >= len(rec) || rec[i] == nil {
  675. continue
  676. }
  677. switch col.typ {
  678. case 0:
  679. case qBool:
  680. case qComplex64:
  681. rec[i] = complex64(rec[i].(complex128))
  682. case qComplex128:
  683. case qFloat32:
  684. rec[i] = float32(rec[i].(float64))
  685. case qFloat64:
  686. case qInt8:
  687. rec[i] = int8(rec[i].(int64))
  688. case qInt16:
  689. rec[i] = int16(rec[i].(int64))
  690. case qInt32:
  691. rec[i] = int32(rec[i].(int64))
  692. case qInt64:
  693. case qString:
  694. case qUint8:
  695. rec[i] = uint8(rec[i].(uint64))
  696. case qUint16:
  697. rec[i] = uint16(rec[i].(uint64))
  698. case qUint32:
  699. rec[i] = uint32(rec[i].(uint64))
  700. case qUint64:
  701. case qBlob, qBigInt, qBigRat, qTime, qDuration:
  702. switch x := rec[i].(type) {
  703. case []byte:
  704. rec[i] = chunk{f: s, b: x}
  705. default:
  706. return nil, fmt.Errorf("(file-006) corrupted DB: non nil chunk type is not []byte")
  707. }
  708. default:
  709. panic("internal error 045")
  710. }
  711. }
  712. if cols != nil {
  713. for n, dn := len(cols)+2, len(rec); dn < n; dn++ {
  714. rec = append(rec, nil)
  715. }
  716. }
  717. return rec, nil
  718. }
  719. func (s *file) freeChunks(enc []byte) (err error) {
  720. items, err := lldb.DecodeScalars(enc)
  721. if err != nil {
  722. return
  723. }
  724. var ok bool
  725. var next int64
  726. switch len(items) {
  727. case 2:
  728. return
  729. case 3:
  730. if next, ok = items[1].(int64); !ok || next == 0 {
  731. return fmt.Errorf("(file-007) corrupted DB: first chunk link")
  732. }
  733. default:
  734. return fmt.Errorf("(file-008) corrupted DB: first chunk")
  735. }
  736. for next != 0 {
  737. b, err := s.a.Get(nil, next)
  738. if err != nil {
  739. return err
  740. }
  741. if items, err = lldb.DecodeScalars(b); err != nil {
  742. return err
  743. }
  744. var h int64
  745. switch len(items) {
  746. case 1:
  747. // nop
  748. case 2:
  749. if h, ok = items[0].(int64); !ok {
  750. return fmt.Errorf("(file-009) corrupted DB: chunk link")
  751. }
  752. default:
  753. return fmt.Errorf("(file-010) corrupted DB: chunk items %d (%v)", len(items), items)
  754. }
  755. s.mu.Lock()
  756. if err = s.a.Free(next); err != nil {
  757. s.mu.Unlock()
  758. return err
  759. }
  760. s.mu.Unlock()
  761. next = h
  762. }
  763. return
  764. }
  765. func (s *file) loadChunks(enc []byte) (v interface{}, err error) {
  766. items, err := lldb.DecodeScalars(enc)
  767. if err != nil {
  768. return
  769. }
  770. var ok bool
  771. var next int64
  772. switch len(items) {
  773. case 2:
  774. // nop
  775. case 3:
  776. if next, ok = items[1].(int64); !ok || next == 0 {
  777. return nil, fmt.Errorf("(file-011) corrupted DB: first chunk link")
  778. }
  779. default:
  780. //fmt.Printf("%d: %#v\n", len(items), items)
  781. return nil, fmt.Errorf("(file-012) corrupted DB: first chunk")
  782. }
  783. typ, ok := items[0].(int64)
  784. if !ok {
  785. return nil, fmt.Errorf("(file-013) corrupted DB: first chunk tag")
  786. }
  787. buf, ok := items[len(items)-1].([]byte)
  788. if !ok {
  789. return nil, fmt.Errorf("(file-014) corrupted DB: first chunk data")
  790. }
  791. for next != 0 {
  792. b, err := s.a.Get(nil, next)
  793. if err != nil {
  794. return nil, err
  795. }
  796. if items, err = lldb.DecodeScalars(b); err != nil {
  797. return nil, err
  798. }
  799. switch len(items) {
  800. case 1:
  801. next = 0
  802. case 2:
  803. if next, ok = items[0].(int64); !ok {
  804. return nil, fmt.Errorf("(file-015) corrupted DB: chunk link")
  805. }
  806. items = items[1:]
  807. default:
  808. return nil, fmt.Errorf("(file-016) corrupted DB: chunk items %d (%v)", len(items), items)
  809. }
  810. if b, ok = items[0].([]byte); !ok {
  811. return nil, fmt.Errorf("(file-017) corrupted DB: chunk data")
  812. }
  813. buf = append(buf, b...)
  814. }
  815. return s.codec.decode(buf, int(typ))
  816. }
  817. func (s *file) Update(h int64, data ...interface{}) (err error) {
  818. b, err := lldb.EncodeScalars(data...)
  819. if err != nil {
  820. return
  821. }
  822. defer s.lock()()
  823. return s.a.Realloc(h, b)
  824. }
  825. func (s *file) UpdateRow(h int64, blobCols []*col, data ...interface{}) (err error) {
  826. if len(blobCols) == 0 {
  827. return s.Update(h, data...)
  828. }
  829. if err = expand(data); err != nil {
  830. return
  831. }
  832. data0, err := s.Read(nil, h, blobCols...)
  833. if err != nil {
  834. return
  835. }
  836. for _, c := range blobCols {
  837. if c.index+2 >= len(data0) {
  838. continue
  839. }
  840. if x := data0[c.index+2]; x != nil {
  841. if err = s.freeChunks(x.(chunk).b); err != nil {
  842. return
  843. }
  844. }
  845. }
  846. if err = s.flatten(data); err != nil {
  847. return
  848. }
  849. return s.Update(h, data...)
  850. }
  851. // []interface{}{qltype, ...}->[]interface{}{lldb scalar type, ...}
  852. // + long blobs are (pre)written to a chain of chunks.
  853. func (s *file) flatten(data []interface{}) (err error) {
  854. for i, v := range data {
  855. tag := 0
  856. var b []byte
  857. switch x := v.(type) {
  858. case []byte:
  859. tag = qBlob
  860. b = x
  861. case *big.Int:
  862. tag = qBigInt
  863. b, err = s.codec.encode(x)
  864. case *big.Rat:
  865. tag = qBigRat
  866. b, err = s.codec.encode(x)
  867. case time.Time:
  868. tag = qTime
  869. b, err = s.codec.encode(x)
  870. case time.Duration:
  871. tag = qDuration
  872. b, err = s.codec.encode(x)
  873. default:
  874. continue
  875. }
  876. if err != nil {
  877. return
  878. }
  879. const chunk = 1 << 16
  880. chunks := 0
  881. var next int64
  882. var buf []byte
  883. for rem := len(b); rem > shortBlob; {
  884. n := mathutil.Min(rem, chunk)
  885. part := b[rem-n:]
  886. b = b[:rem-n]
  887. rem -= n
  888. switch next {
  889. case 0: // last chunk
  890. buf, err = lldb.EncodeScalars([]interface{}{part}...)
  891. default: // middle chunk
  892. buf, err = lldb.EncodeScalars([]interface{}{next, part}...)
  893. }
  894. if err != nil {
  895. return
  896. }
  897. s.mu.Lock()
  898. h, err := s.a.Alloc(buf)
  899. s.mu.Unlock()
  900. if err != nil {
  901. return err
  902. }
  903. next = h
  904. chunks++
  905. }
  906. switch next {
  907. case 0: // single chunk
  908. buf, err = lldb.EncodeScalars([]interface{}{tag, b}...)
  909. default: // multi chunks
  910. buf, err = lldb.EncodeScalars([]interface{}{tag, next, b}...)
  911. }
  912. if err != nil {
  913. return
  914. }
  915. data[i] = buf
  916. }
  917. return
  918. }
  919. func lockName(dbname string) string {
  920. base := filepath.Base(filepath.Clean(dbname)) + "lockfile"
  921. h := sha1.New()
  922. io.WriteString(h, base)
  923. return filepath.Join(filepath.Dir(dbname), fmt.Sprintf(".%x", h.Sum(nil)))
  924. }
  925. func walName(dbname string) (r string) {
  926. base := filepath.Base(filepath.Clean(dbname))
  927. h := sha1.New()
  928. io.WriteString(h, base)
  929. return filepath.Join(filepath.Dir(dbname), fmt.Sprintf(".%x", h.Sum(nil)))
  930. }
  931. type fileIndex struct {
  932. f *file
  933. h int64
  934. t *lldb.BTree
  935. unique bool
  936. codec *gobCoder
  937. }
  938. func (x *fileIndex) Clear() error {
  939. return x.t.Clear()
  940. }
  941. var gbZeroInt64 []byte
  942. func init() {
  943. var err error
  944. if gbZeroInt64, err = lldb.EncodeScalars(int64(0)); err != nil {
  945. panic(err)
  946. }
  947. }
  948. func isIndexNull(data []interface{}) bool {
  949. for _, v := range data {
  950. if v != nil {
  951. return false
  952. }
  953. }
  954. return true
  955. }
  956. // The []byte version of the key in the BTree shares chunks, if any, with
  957. // the value stored in the record.
  958. func (x *fileIndex) Create(indexedValues []interface{}, h int64) error {
  959. for i, indexedValue := range indexedValues {
  960. chunk, ok := indexedValue.(chunk)
  961. if ok {
  962. indexedValues[i] = chunk.b
  963. }
  964. }
  965. t := x.t
  966. switch {
  967. case !x.unique:
  968. k, err := lldb.EncodeScalars(append(indexedValues, h)...)
  969. if err != nil {
  970. return err
  971. }
  972. return t.Set(k, gbZeroInt64)
  973. case isIndexNull(indexedValues): // unique, NULL
  974. k, err := lldb.EncodeScalars(nil, h)
  975. if err != nil {
  976. return err
  977. }
  978. return t.Set(k, gbZeroInt64)
  979. default: // unique, non NULL
  980. k, err := lldb.EncodeScalars(append(indexedValues, int64(0))...)
  981. if err != nil {
  982. return err
  983. }
  984. v, err := lldb.EncodeScalars(h)
  985. if err != nil {
  986. return err
  987. }
  988. _, _, err = t.Put(nil, k, func(key, old []byte) (new []byte, write bool, err error) {
  989. if old == nil {
  990. return v, true, nil
  991. }
  992. return nil, false, fmt.Errorf("(file-018) cannot insert into unique index: duplicate value(s): %v", indexedValues)
  993. })
  994. return err
  995. }
  996. }
  997. func (x *fileIndex) Delete(indexedValues []interface{}, h int64) error {
  998. for i, indexedValue := range indexedValues {
  999. chunk, ok := indexedValue.(chunk)
  1000. if ok {
  1001. indexedValues[i] = chunk.b
  1002. }
  1003. }
  1004. t := x.t
  1005. var k []byte
  1006. var err error
  1007. switch {
  1008. case !x.unique:
  1009. k, err = lldb.EncodeScalars(append(indexedValues, h)...)
  1010. case isIndexNull(indexedValues): // unique, NULL
  1011. k, err = lldb.EncodeScalars(nil, h)
  1012. default: // unique, non NULL
  1013. k, err = lldb.EncodeScalars(append(indexedValues, int64(0))...)
  1014. }
  1015. if err != nil {
  1016. return err
  1017. }
  1018. return t.Delete(k)
  1019. }
  1020. func (x *fileIndex) Drop() error {
  1021. if err := x.Clear(); err != nil {
  1022. return err
  1023. }
  1024. return x.f.a.Free(x.h)
  1025. }
  1026. // []interface{}{qltype, ...}->[]interface{}{lldb scalar type, ...}
  1027. func (x *fileIndex) flatten(data []interface{}) (err error) {
  1028. for i, v := range data {
  1029. tag := 0
  1030. var b []byte
  1031. switch xx := v.(type) {
  1032. case []byte:
  1033. tag = qBlob
  1034. b = xx
  1035. case *big.Int:
  1036. tag = qBigInt
  1037. b, err = x.codec.encode(xx)
  1038. case *big.Rat:
  1039. tag = qBigRat
  1040. b, err = x.codec.encode(xx)
  1041. case time.Time:
  1042. tag = qTime
  1043. b, err = x.codec.encode(xx)
  1044. case time.Duration:
  1045. tag = qDuration
  1046. b, err = x.codec.encode(xx)
  1047. default:
  1048. continue
  1049. }
  1050. if err != nil {
  1051. return
  1052. }
  1053. var buf []byte
  1054. if buf, err = lldb.EncodeScalars([]interface{}{tag, b}...); err != nil {
  1055. return
  1056. }
  1057. data[i] = buf
  1058. }
  1059. return
  1060. }
  1061. func (x *fileIndex) Seek(indexedValues []interface{}) (indexIterator, bool, error) {
  1062. data := append(indexedValues, 0)
  1063. if err := x.flatten(data); err != nil {
  1064. return nil, false, err
  1065. }
  1066. k, err := lldb.EncodeScalars(data...)
  1067. if err != nil {
  1068. return nil, false, err
  1069. }
  1070. en, hit, err := x.t.Seek(k)
  1071. if err != nil {
  1072. return nil, false, err
  1073. }
  1074. return &fileIndexIterator{x.f, en, x.unique}, hit, nil
  1075. }
  1076. func (x *fileIndex) SeekFirst() (iter indexIterator, err error) {
  1077. en, err := x.t.SeekFirst()
  1078. return &fileIndexIterator{x.f, en, x.unique}, err
  1079. }
  1080. func (x *fileIndex) SeekLast() (iter indexIterator, err error) {
  1081. en, err := x.t.SeekLast()
  1082. return &fileIndexIterator{x.f, en, x.unique}, err
  1083. }
  1084. type fileIndexIterator struct {
  1085. f *file
  1086. en *lldb.BTreeEnumerator
  1087. unique bool
  1088. }
  1089. func (i *fileIndexIterator) nextPrev(f func() ([]byte, []byte, error)) ([]interface{}, int64, error) { //TODO(indices) blobs: +test
  1090. bk, bv, err := f()
  1091. if err != nil {
  1092. return nil, -1, err
  1093. }
  1094. dk, err := lldb.DecodeScalars(bk)
  1095. if err != nil {
  1096. return nil, -1, err
  1097. }
  1098. b, ok := dk[0].([]byte)
  1099. if ok {
  1100. dk[0] = chunk{i.f, b}
  1101. if expand(dk[:1]); err != nil {
  1102. return nil, -1, err
  1103. }
  1104. }
  1105. var k indexKey
  1106. k.value = dk[:len(dk)-1]
  1107. switch i.unique {
  1108. case true:
  1109. if isIndexNull(k.value) {
  1110. return nil, dk[len(dk)-1].(int64), nil
  1111. }
  1112. dv, err := lldb.DecodeScalars(bv)
  1113. if err != nil {
  1114. return nil, -1, err
  1115. }
  1116. return k.value, dv[0].(int64), nil
  1117. default:
  1118. return k.value, dk[len(dk)-1].(int64), nil
  1119. }
  1120. }
  1121. func (i *fileIndexIterator) Next() ([]interface{}, int64, error) { //TODO(indices) blobs: +test
  1122. return i.nextPrev(i.en.Next)
  1123. }
  1124. func (i *fileIndexIterator) Prev() ([]interface{}, int64, error) { //TODO(indices) blobs: +test
  1125. return i.nextPrev(i.en.Prev)
  1126. }