| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323 |
- // Copyright 2014 The ql Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // Well known handles
- // 1: root
- // 2: id
- package ql
- import (
- "crypto/sha1"
- "fmt"
- "io"
- "io/ioutil"
- "math/big"
- "os"
- "path/filepath"
- "sync"
- "time"
- "github.com/cznic/lldb"
- "github.com/cznic/mathutil"
- "github.com/cznic/ql/vendored/github.com/camlistore/go4/lock"
- )
- const (
- magic = "\x60\xdbql"
- )
- var (
- _ btreeIndex = (*fileIndex)(nil)
- _ btreeIterator = (*fileBTreeIterator)(nil)
- _ indexIterator = (*fileIndexIterator)(nil)
- _ storage = (*file)(nil)
- _ temp = (*fileTemp)(nil)
- )
- type chunk struct { // expanded to blob types lazily
- f *file
- b []byte
- }
- func (c chunk) expand() (v interface{}, err error) {
- return c.f.loadChunks(c.b)
- }
- func expand1(data interface{}, e error) (v interface{}, err error) {
- if e != nil {
- return nil, e
- }
- c, ok := data.(chunk)
- if !ok {
- return data, nil
- }
- return c.expand()
- }
- func expand(data []interface{}) (err error) {
- for i, v := range data {
- if data[i], err = expand1(v, nil); err != nil {
- return
- }
- }
- return
- }
- // OpenFile returns a DB backed by a named file. The back end limits the size
- // of a record to about 64 kB.
- func OpenFile(name string, opt *Options) (db *DB, err error) {
- var f lldb.OSFile
- if f = opt.OSFile; f == nil {
- f, err = os.OpenFile(name, os.O_RDWR, 0666)
- if err != nil {
- if !os.IsNotExist(err) {
- return nil, err
- }
- if !opt.CanCreate {
- return nil, err
- }
- f, err = os.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0666)
- if err != nil {
- return nil, err
- }
- }
- }
- fi, err := newFileFromOSFile(f) // always ACID
- if err != nil {
- return
- }
- if fi.tempFile = opt.TempFile; fi.tempFile == nil {
- fi.tempFile = func(dir, prefix string) (f lldb.OSFile, err error) {
- f0, err := ioutil.TempFile(dir, prefix)
- return f0, err
- }
- }
- return newDB(fi)
- }
- // Options amend the behavior of OpenFile.
- //
- // CanCreate
- //
- // The CanCreate option enables OpenFile to create the DB file if it does not
- // exists.
- //
- // OSFile
- //
- // OSFile allows to pass an os.File like back end providing, for example,
- // encrypted storage. If this field is nil then OpenFile uses the file named by
- // the 'name' parameter instead.
- //
- // TempFile
- //
- // TempFile provides a temporary file used for evaluating the GROUP BY, ORDER
- // BY, ... clauses. The hook is intended to be used by encrypted DB back ends
- // to avoid leaks of unecrypted data to such temp files by providing temp files
- // which are encrypted as well. Note that *os.File satisfies the lldb.OSFile
- // interface.
- //
- // If TempFile is nil it defaults to ioutil.TempFile.
- type Options struct {
- CanCreate bool
- OSFile lldb.OSFile
- TempFile func(dir, prefix string) (f lldb.OSFile, err error)
- }
- type fileBTreeIterator struct {
- en *lldb.BTreeEnumerator
- t *fileTemp
- }
- func (it *fileBTreeIterator) Next() (k, v []interface{}, err error) {
- bk, bv, err := it.en.Next()
- if err != nil {
- return
- }
- if k, err = lldb.DecodeScalars(bk); err != nil {
- return
- }
- for i, val := range k {
- b, ok := val.([]byte)
- if !ok {
- continue
- }
- c := chunk{it.t.file, b}
- if k[i], err = c.expand(); err != nil {
- return nil, nil, err
- }
- }
- if err = enforce(k, it.t.colsK); err != nil {
- return
- }
- if v, err = lldb.DecodeScalars(bv); err != nil {
- return
- }
- for i, val := range v {
- b, ok := val.([]byte)
- if !ok {
- continue
- }
- c := chunk{it.t.file, b}
- if v[i], err = c.expand(); err != nil {
- return nil, nil, err
- }
- }
- err = enforce(v, it.t.colsV)
- return
- }
- func enforce(val []interface{}, cols []*col) (err error) {
- for i, v := range val {
- if val[i], err = convert(v, cols[i].typ); err != nil {
- return
- }
- }
- return
- }
- //NTYPE
- func infer(from []interface{}, to *[]*col) {
- if len(*to) == 0 {
- *to = make([]*col, len(from))
- for i := range *to {
- (*to)[i] = &col{}
- }
- }
- for i, c := range *to {
- if f := from[i]; f != nil {
- switch x := f.(type) {
- //case nil:
- case idealComplex:
- c.typ = qComplex128
- from[i] = complex128(x)
- case idealFloat:
- c.typ = qFloat64
- from[i] = float64(x)
- case idealInt:
- c.typ = qInt64
- from[i] = int64(x)
- case idealRune:
- c.typ = qInt32
- from[i] = int32(x)
- case idealUint:
- c.typ = qUint64
- from[i] = uint64(x)
- case bool:
- c.typ = qBool
- case complex128:
- c.typ = qComplex128
- case complex64:
- c.typ = qComplex64
- case float64:
- c.typ = qFloat64
- case float32:
- c.typ = qFloat32
- case int8:
- c.typ = qInt8
- case int16:
- c.typ = qInt16
- case int32:
- c.typ = qInt32
- case int64:
- c.typ = qInt64
- case string:
- c.typ = qString
- case uint8:
- c.typ = qUint8
- case uint16:
- c.typ = qUint16
- case uint32:
- c.typ = qUint32
- case uint64:
- c.typ = qUint64
- case []byte:
- c.typ = qBlob
- case *big.Int:
- c.typ = qBigInt
- case *big.Rat:
- c.typ = qBigRat
- case time.Time:
- c.typ = qTime
- case time.Duration:
- c.typ = qDuration
- case chunk:
- vals, err := lldb.DecodeScalars([]byte(x.b))
- if err != nil {
- panic(err)
- }
- if len(vals) == 0 {
- panic("internal error 040")
- }
- i, ok := vals[0].(int64)
- if !ok {
- panic("internal error 041")
- }
- c.typ = int(i)
- case map[string]interface{}: // map of ids of a cross join
- default:
- panic("internal error 042")
- }
- }
- }
- }
- type fileTemp struct {
- *file
- colsK []*col
- colsV []*col
- t *lldb.BTree
- }
- func (t *fileTemp) BeginTransaction() error {
- return nil
- }
- func (t *fileTemp) Get(k []interface{}) (v []interface{}, err error) {
- if err = expand(k); err != nil {
- return
- }
- if err = t.flatten(k); err != nil {
- return nil, err
- }
- bk, err := lldb.EncodeScalars(k...)
- if err != nil {
- return
- }
- bv, err := t.t.Get(nil, bk)
- if err != nil {
- return
- }
- return lldb.DecodeScalars(bv)
- }
- func (t *fileTemp) Drop() (err error) {
- if t.f0 == nil {
- return
- }
- fn := t.f0.Name()
- if err = t.f0.Close(); err != nil {
- return
- }
- if fn == "" {
- return
- }
- return os.Remove(fn)
- }
- func (t *fileTemp) SeekFirst() (it btreeIterator, err error) {
- en, err := t.t.SeekFirst()
- if err != nil {
- return
- }
- return &fileBTreeIterator{t: t, en: en}, nil
- }
- func (t *fileTemp) Set(k, v []interface{}) (err error) {
- if err = expand(k); err != nil {
- return
- }
- if err = expand(v); err != nil {
- return
- }
- infer(k, &t.colsK)
- infer(v, &t.colsV)
- if err = t.flatten(k); err != nil {
- return
- }
- bk, err := lldb.EncodeScalars(k...)
- if err != nil {
- return
- }
- if err = t.flatten(v); err != nil {
- return
- }
- bv, err := lldb.EncodeScalars(v...)
- if err != nil {
- return
- }
- return t.t.Set(bk, bv)
- }
- type file struct {
- a *lldb.Allocator
- codec *gobCoder
- f lldb.Filer
- f0 lldb.OSFile
- id int64
- lck io.Closer
- mu sync.Mutex
- name string
- tempFile func(dir, prefix string) (f lldb.OSFile, err error)
- wal *os.File
- }
- func newFileFromOSFile(f lldb.OSFile) (fi *file, err error) {
- nm := lockName(f.Name())
- lck, err := lock.Lock(nm)
- if err != nil {
- if lck != nil {
- lck.Close()
- }
- return nil, err
- }
- close := true
- defer func() {
- if close && lck != nil {
- lck.Close()
- }
- }()
- var w *os.File
- closew := false
- wn := walName(f.Name())
- w, err = os.OpenFile(wn, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0666)
- closew = true
- defer func() {
- if w != nil && closew {
- nm := w.Name()
- w.Close()
- os.Remove(nm)
- w = nil
- }
- }()
- if err != nil {
- if !os.IsExist(err) {
- return nil, err
- }
- closew = false
- w, err = os.OpenFile(wn, os.O_RDWR, 0666)
- if err != nil {
- return nil, err
- }
- closew = true
- st, err := w.Stat()
- if err != nil {
- return nil, err
- }
- if st.Size() != 0 {
- return nil, fmt.Errorf("(file-001) non empty WAL file %s exists", wn)
- }
- }
- info, err := f.Stat()
- if err != nil {
- return nil, err
- }
- switch sz := info.Size(); {
- case sz == 0:
- b := make([]byte, 16)
- copy(b, []byte(magic))
- if _, err := f.Write(b); err != nil {
- return nil, err
- }
- filer := lldb.Filer(lldb.NewOSFiler(f))
- filer = lldb.NewInnerFiler(filer, 16)
- if filer, err = lldb.NewACIDFiler(filer, w); err != nil {
- return nil, err
- }
- a, err := lldb.NewAllocator(filer, &lldb.Options{})
- if err != nil {
- return nil, err
- }
- a.Compress = true
- s := &file{
- a: a,
- codec: newGobCoder(),
- f0: f,
- f: filer,
- lck: lck,
- name: f.Name(),
- wal: w,
- }
- if err = s.BeginTransaction(); err != nil {
- return nil, err
- }
- h, err := s.Create()
- if err != nil {
- return nil, err
- }
- if h != 1 { // root
- panic("internal error 043")
- }
- if h, err = s.a.Alloc(make([]byte, 8)); err != nil {
- return nil, err
- }
- if h != 2 { // id
- panic("internal error 044")
- }
- close, closew = false, false
- return s, s.Commit()
- default:
- b := make([]byte, 16)
- if _, err := f.Read(b); err != nil {
- return nil, err
- }
- if string(b[:len(magic)]) != magic {
- return nil, fmt.Errorf("(file-002) unknown file format")
- }
- filer := lldb.Filer(lldb.NewOSFiler(f))
- filer = lldb.NewInnerFiler(filer, 16)
- if filer, err = lldb.NewACIDFiler(filer, w); err != nil {
- return nil, err
- }
- a, err := lldb.NewAllocator(filer, &lldb.Options{})
- if err != nil {
- return nil, err
- }
- bid, err := a.Get(nil, 2) // id
- if err != nil {
- return nil, err
- }
- if len(bid) != 8 {
- return nil, fmt.Errorf("(file-003) corrupted DB: id |% x|", bid)
- }
- id := int64(0)
- for _, v := range bid {
- id = (id << 8) | int64(v)
- }
- a.Compress = true
- s := &file{
- a: a,
- codec: newGobCoder(),
- f0: f,
- f: filer,
- id: id,
- lck: lck,
- name: f.Name(),
- wal: w,
- }
- close, closew = false, false
- return s, nil
- }
- }
- func (s *file) OpenIndex(unique bool, handle int64) (btreeIndex, error) {
- t, err := lldb.OpenBTree(s.a, s.collate, handle)
- if err != nil {
- return nil, err
- }
- return &fileIndex{s, handle, t, unique, newGobCoder()}, nil
- }
- func (s *file) CreateIndex(unique bool) ( /* handle */ int64, btreeIndex, error) {
- t, h, err := lldb.CreateBTree(s.a, s.collate)
- if err != nil {
- return -1, nil, err
- }
- return h, &fileIndex{s, h, t, unique, newGobCoder()}, nil
- }
- func (s *file) Acid() bool { return s.wal != nil }
- func errSet(p *error, errs ...error) (err error) {
- err = *p
- for _, e := range errs {
- if err != nil {
- return
- }
- *p, err = e, e
- }
- return
- }
- func (s *file) lock() func() {
- s.mu.Lock()
- return s.mu.Unlock
- }
- func (s *file) Close() (err error) {
- defer s.lock()()
- es := s.f0.Sync()
- ef := s.f0.Close()
- var ew error
- if s.wal != nil {
- ew = s.wal.Close()
- }
- el := s.lck.Close()
- return errSet(&err, es, ef, ew, el)
- }
- func (s *file) Name() string { return s.name }
- func (s *file) Verify() (allocs int64, err error) {
- defer s.lock()()
- var stat lldb.AllocStats
- if err = s.a.Verify(lldb.NewMemFiler(), nil, &stat); err != nil {
- return
- }
- allocs = stat.AllocAtoms
- return
- }
- func (s *file) expandBytes(d []interface{}) (err error) {
- for i, v := range d {
- b, ok := v.([]byte)
- if !ok {
- continue
- }
- d[i], err = s.loadChunks(b)
- if err != nil {
- return
- }
- }
- return
- }
- func (s *file) collate(a, b []byte) int { //TODO w/ error return
- da, err := lldb.DecodeScalars(a)
- if err != nil {
- panic(err)
- }
- if err = s.expandBytes(da); err != nil {
- panic(err)
- }
- db, err := lldb.DecodeScalars(b)
- if err != nil {
- panic(err)
- }
- if err = s.expandBytes(db); err != nil {
- panic(err)
- }
- //dbg("da: %v, db: %v", da, db)
- return collate(da, db)
- }
- func (s *file) CreateTemp(asc bool) (bt temp, err error) {
- f, err := s.tempFile("", "ql-tmp-")
- if err != nil {
- return nil, err
- }
- fn := f.Name()
- filer := lldb.NewOSFiler(f)
- a, err := lldb.NewAllocator(filer, &lldb.Options{})
- if err != nil {
- f.Close()
- os.Remove(fn)
- return nil, err
- }
- k := 1
- if !asc {
- k = -1
- }
- t, _, err := lldb.CreateBTree(a, func(a, b []byte) int { //TODO w/ error return
- return k * s.collate(a, b)
- })
- if err != nil {
- f.Close()
- if fn != "" {
- os.Remove(fn)
- }
- return nil, err
- }
- x := &fileTemp{file: &file{
- a: a,
- codec: newGobCoder(),
- f0: f,
- },
- t: t}
- return x, nil
- }
- func (s *file) BeginTransaction() (err error) {
- defer s.lock()()
- return s.f.BeginUpdate()
- }
- func (s *file) Rollback() (err error) {
- defer s.lock()()
- return s.f.Rollback()
- }
- func (s *file) Commit() (err error) {
- defer s.lock()()
- return s.f.EndUpdate()
- }
- func (s *file) Create(data ...interface{}) (h int64, err error) {
- if err = expand(data); err != nil {
- return
- }
- if err = s.flatten(data); err != nil {
- return
- }
- b, err := lldb.EncodeScalars(data...)
- if err != nil {
- return
- }
- defer s.lock()()
- return s.a.Alloc(b)
- }
- func (s *file) Delete(h int64, blobCols ...*col) (err error) {
- switch len(blobCols) {
- case 0:
- defer s.lock()()
- return s.a.Free(h)
- default:
- return s.free(h, blobCols)
- }
- }
- func (s *file) ResetID() (err error) {
- s.id = 0
- return
- }
- func (s *file) ID() (int64, error) {
- defer s.lock()()
- s.id++
- b := make([]byte, 8)
- id := s.id
- for i := 7; i >= 0; i-- {
- b[i] = byte(id)
- id >>= 8
- }
- return s.id, s.a.Realloc(2, b)
- }
- func (s *file) free(h int64, blobCols []*col) (err error) {
- b, err := s.a.Get(nil, h) //LATER +bufs
- if err != nil {
- return
- }
- rec, err := lldb.DecodeScalars(b)
- if err != nil {
- return
- }
- for _, col := range blobCols {
- if col.index >= len(rec) {
- return fmt.Errorf("(file-004) file.free: corrupted DB (record len)")
- }
- if col.index+2 >= len(rec) {
- continue
- }
- switch x := rec[col.index+2].(type) {
- case nil:
- // nop
- case []byte:
- if err = s.freeChunks(x); err != nil {
- return
- }
- }
- }
- defer s.lock()()
- return s.a.Free(h)
- }
- func (s *file) Read(dst []interface{}, h int64, cols ...*col) (data []interface{}, err error) { //NTYPE
- b, err := s.a.Get(nil, h) //LATER +bufs
- if err != nil {
- return
- }
- rec, err := lldb.DecodeScalars(b)
- if err != nil {
- return
- }
- for _, col := range cols {
- i := col.index + 2
- if i >= len(rec) || rec[i] == nil {
- continue
- }
- switch col.typ {
- case 0:
- case qBool:
- case qComplex64:
- rec[i] = complex64(rec[i].(complex128))
- case qComplex128:
- case qFloat32:
- rec[i] = float32(rec[i].(float64))
- case qFloat64:
- case qInt8:
- rec[i] = int8(rec[i].(int64))
- case qInt16:
- rec[i] = int16(rec[i].(int64))
- case qInt32:
- rec[i] = int32(rec[i].(int64))
- case qInt64:
- case qString:
- case qUint8:
- rec[i] = uint8(rec[i].(uint64))
- case qUint16:
- rec[i] = uint16(rec[i].(uint64))
- case qUint32:
- rec[i] = uint32(rec[i].(uint64))
- case qUint64:
- case qBlob, qBigInt, qBigRat, qTime, qDuration:
- switch x := rec[i].(type) {
- case []byte:
- rec[i] = chunk{f: s, b: x}
- default:
- return nil, fmt.Errorf("(file-006) corrupted DB: non nil chunk type is not []byte")
- }
- default:
- panic("internal error 045")
- }
- }
- if cols != nil {
- for n, dn := len(cols)+2, len(rec); dn < n; dn++ {
- rec = append(rec, nil)
- }
- }
- return rec, nil
- }
- func (s *file) freeChunks(enc []byte) (err error) {
- items, err := lldb.DecodeScalars(enc)
- if err != nil {
- return
- }
- var ok bool
- var next int64
- switch len(items) {
- case 2:
- return
- case 3:
- if next, ok = items[1].(int64); !ok || next == 0 {
- return fmt.Errorf("(file-007) corrupted DB: first chunk link")
- }
- default:
- return fmt.Errorf("(file-008) corrupted DB: first chunk")
- }
- for next != 0 {
- b, err := s.a.Get(nil, next)
- if err != nil {
- return err
- }
- if items, err = lldb.DecodeScalars(b); err != nil {
- return err
- }
- var h int64
- switch len(items) {
- case 1:
- // nop
- case 2:
- if h, ok = items[0].(int64); !ok {
- return fmt.Errorf("(file-009) corrupted DB: chunk link")
- }
- default:
- return fmt.Errorf("(file-010) corrupted DB: chunk items %d (%v)", len(items), items)
- }
- s.mu.Lock()
- if err = s.a.Free(next); err != nil {
- s.mu.Unlock()
- return err
- }
- s.mu.Unlock()
- next = h
- }
- return
- }
- func (s *file) loadChunks(enc []byte) (v interface{}, err error) {
- items, err := lldb.DecodeScalars(enc)
- if err != nil {
- return
- }
- var ok bool
- var next int64
- switch len(items) {
- case 2:
- // nop
- case 3:
- if next, ok = items[1].(int64); !ok || next == 0 {
- return nil, fmt.Errorf("(file-011) corrupted DB: first chunk link")
- }
- default:
- //fmt.Printf("%d: %#v\n", len(items), items)
- return nil, fmt.Errorf("(file-012) corrupted DB: first chunk")
- }
- typ, ok := items[0].(int64)
- if !ok {
- return nil, fmt.Errorf("(file-013) corrupted DB: first chunk tag")
- }
- buf, ok := items[len(items)-1].([]byte)
- if !ok {
- return nil, fmt.Errorf("(file-014) corrupted DB: first chunk data")
- }
- for next != 0 {
- b, err := s.a.Get(nil, next)
- if err != nil {
- return nil, err
- }
- if items, err = lldb.DecodeScalars(b); err != nil {
- return nil, err
- }
- switch len(items) {
- case 1:
- next = 0
- case 2:
- if next, ok = items[0].(int64); !ok {
- return nil, fmt.Errorf("(file-015) corrupted DB: chunk link")
- }
- items = items[1:]
- default:
- return nil, fmt.Errorf("(file-016) corrupted DB: chunk items %d (%v)", len(items), items)
- }
- if b, ok = items[0].([]byte); !ok {
- return nil, fmt.Errorf("(file-017) corrupted DB: chunk data")
- }
- buf = append(buf, b...)
- }
- return s.codec.decode(buf, int(typ))
- }
- func (s *file) Update(h int64, data ...interface{}) (err error) {
- b, err := lldb.EncodeScalars(data...)
- if err != nil {
- return
- }
- defer s.lock()()
- return s.a.Realloc(h, b)
- }
- func (s *file) UpdateRow(h int64, blobCols []*col, data ...interface{}) (err error) {
- if len(blobCols) == 0 {
- return s.Update(h, data...)
- }
- if err = expand(data); err != nil {
- return
- }
- data0, err := s.Read(nil, h, blobCols...)
- if err != nil {
- return
- }
- for _, c := range blobCols {
- if c.index+2 >= len(data0) {
- continue
- }
- if x := data0[c.index+2]; x != nil {
- if err = s.freeChunks(x.(chunk).b); err != nil {
- return
- }
- }
- }
- if err = s.flatten(data); err != nil {
- return
- }
- return s.Update(h, data...)
- }
- // []interface{}{qltype, ...}->[]interface{}{lldb scalar type, ...}
- // + long blobs are (pre)written to a chain of chunks.
- func (s *file) flatten(data []interface{}) (err error) {
- for i, v := range data {
- tag := 0
- var b []byte
- switch x := v.(type) {
- case []byte:
- tag = qBlob
- b = x
- case *big.Int:
- tag = qBigInt
- b, err = s.codec.encode(x)
- case *big.Rat:
- tag = qBigRat
- b, err = s.codec.encode(x)
- case time.Time:
- tag = qTime
- b, err = s.codec.encode(x)
- case time.Duration:
- tag = qDuration
- b, err = s.codec.encode(x)
- default:
- continue
- }
- if err != nil {
- return
- }
- const chunk = 1 << 16
- chunks := 0
- var next int64
- var buf []byte
- for rem := len(b); rem > shortBlob; {
- n := mathutil.Min(rem, chunk)
- part := b[rem-n:]
- b = b[:rem-n]
- rem -= n
- switch next {
- case 0: // last chunk
- buf, err = lldb.EncodeScalars([]interface{}{part}...)
- default: // middle chunk
- buf, err = lldb.EncodeScalars([]interface{}{next, part}...)
- }
- if err != nil {
- return
- }
- s.mu.Lock()
- h, err := s.a.Alloc(buf)
- s.mu.Unlock()
- if err != nil {
- return err
- }
- next = h
- chunks++
- }
- switch next {
- case 0: // single chunk
- buf, err = lldb.EncodeScalars([]interface{}{tag, b}...)
- default: // multi chunks
- buf, err = lldb.EncodeScalars([]interface{}{tag, next, b}...)
- }
- if err != nil {
- return
- }
- data[i] = buf
- }
- return
- }
- func lockName(dbname string) string {
- base := filepath.Base(filepath.Clean(dbname)) + "lockfile"
- h := sha1.New()
- io.WriteString(h, base)
- return filepath.Join(filepath.Dir(dbname), fmt.Sprintf(".%x", h.Sum(nil)))
- }
- func walName(dbname string) (r string) {
- base := filepath.Base(filepath.Clean(dbname))
- h := sha1.New()
- io.WriteString(h, base)
- return filepath.Join(filepath.Dir(dbname), fmt.Sprintf(".%x", h.Sum(nil)))
- }
- type fileIndex struct {
- f *file
- h int64
- t *lldb.BTree
- unique bool
- codec *gobCoder
- }
- func (x *fileIndex) Clear() error {
- return x.t.Clear()
- }
- var gbZeroInt64 []byte
- func init() {
- var err error
- if gbZeroInt64, err = lldb.EncodeScalars(int64(0)); err != nil {
- panic(err)
- }
- }
- func isIndexNull(data []interface{}) bool {
- for _, v := range data {
- if v != nil {
- return false
- }
- }
- return true
- }
- // The []byte version of the key in the BTree shares chunks, if any, with
- // the value stored in the record.
- func (x *fileIndex) Create(indexedValues []interface{}, h int64) error {
- for i, indexedValue := range indexedValues {
- chunk, ok := indexedValue.(chunk)
- if ok {
- indexedValues[i] = chunk.b
- }
- }
- t := x.t
- switch {
- case !x.unique:
- k, err := lldb.EncodeScalars(append(indexedValues, h)...)
- if err != nil {
- return err
- }
- return t.Set(k, gbZeroInt64)
- case isIndexNull(indexedValues): // unique, NULL
- k, err := lldb.EncodeScalars(nil, h)
- if err != nil {
- return err
- }
- return t.Set(k, gbZeroInt64)
- default: // unique, non NULL
- k, err := lldb.EncodeScalars(append(indexedValues, int64(0))...)
- if err != nil {
- return err
- }
- v, err := lldb.EncodeScalars(h)
- if err != nil {
- return err
- }
- _, _, err = t.Put(nil, k, func(key, old []byte) (new []byte, write bool, err error) {
- if old == nil {
- return v, true, nil
- }
- return nil, false, fmt.Errorf("(file-018) cannot insert into unique index: duplicate value(s): %v", indexedValues)
- })
- return err
- }
- }
- func (x *fileIndex) Delete(indexedValues []interface{}, h int64) error {
- for i, indexedValue := range indexedValues {
- chunk, ok := indexedValue.(chunk)
- if ok {
- indexedValues[i] = chunk.b
- }
- }
- t := x.t
- var k []byte
- var err error
- switch {
- case !x.unique:
- k, err = lldb.EncodeScalars(append(indexedValues, h)...)
- case isIndexNull(indexedValues): // unique, NULL
- k, err = lldb.EncodeScalars(nil, h)
- default: // unique, non NULL
- k, err = lldb.EncodeScalars(append(indexedValues, int64(0))...)
- }
- if err != nil {
- return err
- }
- return t.Delete(k)
- }
- func (x *fileIndex) Drop() error {
- if err := x.Clear(); err != nil {
- return err
- }
- return x.f.a.Free(x.h)
- }
- // []interface{}{qltype, ...}->[]interface{}{lldb scalar type, ...}
- func (x *fileIndex) flatten(data []interface{}) (err error) {
- for i, v := range data {
- tag := 0
- var b []byte
- switch xx := v.(type) {
- case []byte:
- tag = qBlob
- b = xx
- case *big.Int:
- tag = qBigInt
- b, err = x.codec.encode(xx)
- case *big.Rat:
- tag = qBigRat
- b, err = x.codec.encode(xx)
- case time.Time:
- tag = qTime
- b, err = x.codec.encode(xx)
- case time.Duration:
- tag = qDuration
- b, err = x.codec.encode(xx)
- default:
- continue
- }
- if err != nil {
- return
- }
- var buf []byte
- if buf, err = lldb.EncodeScalars([]interface{}{tag, b}...); err != nil {
- return
- }
- data[i] = buf
- }
- return
- }
- func (x *fileIndex) Seek(indexedValues []interface{}) (indexIterator, bool, error) {
- data := append(indexedValues, 0)
- if err := x.flatten(data); err != nil {
- return nil, false, err
- }
- k, err := lldb.EncodeScalars(data...)
- if err != nil {
- return nil, false, err
- }
- en, hit, err := x.t.Seek(k)
- if err != nil {
- return nil, false, err
- }
- return &fileIndexIterator{x.f, en, x.unique}, hit, nil
- }
- func (x *fileIndex) SeekFirst() (iter indexIterator, err error) {
- en, err := x.t.SeekFirst()
- return &fileIndexIterator{x.f, en, x.unique}, err
- }
- func (x *fileIndex) SeekLast() (iter indexIterator, err error) {
- en, err := x.t.SeekLast()
- return &fileIndexIterator{x.f, en, x.unique}, err
- }
- type fileIndexIterator struct {
- f *file
- en *lldb.BTreeEnumerator
- unique bool
- }
- func (i *fileIndexIterator) nextPrev(f func() ([]byte, []byte, error)) ([]interface{}, int64, error) { //TODO(indices) blobs: +test
- bk, bv, err := f()
- if err != nil {
- return nil, -1, err
- }
- dk, err := lldb.DecodeScalars(bk)
- if err != nil {
- return nil, -1, err
- }
- b, ok := dk[0].([]byte)
- if ok {
- dk[0] = chunk{i.f, b}
- if expand(dk[:1]); err != nil {
- return nil, -1, err
- }
- }
- var k indexKey
- k.value = dk[:len(dk)-1]
- switch i.unique {
- case true:
- if isIndexNull(k.value) {
- return nil, dk[len(dk)-1].(int64), nil
- }
- dv, err := lldb.DecodeScalars(bv)
- if err != nil {
- return nil, -1, err
- }
- return k.value, dv[0].(int64), nil
- default:
- return k.value, dk[len(dk)-1].(int64), nil
- }
- }
- func (i *fileIndexIterator) Next() ([]interface{}, int64, error) { //TODO(indices) blobs: +test
- return i.nextPrev(i.en.Next)
- }
- func (i *fileIndexIterator) Prev() ([]interface{}, int64, error) { //TODO(indices) blobs: +test
- return i.nextPrev(i.en.Prev)
- }
|