driver.go 13 KB


  1. // Copyright 2014 The ql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // database/sql/driver
  5. package ql
  6. import (
  7. "bytes"
  8. "database/sql"
  9. "database/sql/driver"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "math/big"
  14. "net/url"
  15. "os"
  16. "path/filepath"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. )
  22. var (
  23. _ driver.Conn = (*driverConn)(nil)
  24. _ driver.Driver = (*sqlDriver)(nil)
  25. _ driver.Execer = (*driverConn)(nil)
  26. _ driver.Queryer = (*driverConn)(nil)
  27. _ driver.Result = (*driverResult)(nil)
  28. _ driver.Rows = (*driverRows)(nil)
  29. _ driver.Stmt = (*driverStmt)(nil)
  30. _ driver.Tx = (*driverConn)(nil)
  31. txBegin = MustCompile("BEGIN TRANSACTION;")
  32. txCommit = MustCompile("COMMIT;")
  33. txRollback = MustCompile("ROLLBACK;")
  34. errNoResult = errors.New("query statement does not produce a result set (no top level SELECT)")
  35. )
  36. type errList []error
  37. func (e *errList) append(err error) {
  38. if err != nil {
  39. *e = append(*e, err)
  40. }
  41. }
  42. func (e errList) error() error {
  43. if len(e) == 0 {
  44. return nil
  45. }
  46. return e
  47. }
  48. func (e errList) Error() string {
  49. a := make([]string, len(e))
  50. for i, v := range e {
  51. a[i] = v.Error()
  52. }
  53. return strings.Join(a, "\n")
  54. }
  55. func params(args []driver.Value) []interface{} {
  56. r := make([]interface{}, len(args))
  57. for i, v := range args {
  58. r[i] = interface{}(v)
  59. }
  60. return r
  61. }
  62. var (
  63. fileDriver = &sqlDriver{dbs: map[string]*driverDB{}}
  64. fileDriverOnce sync.Once
  65. memDriver = &sqlDriver{isMem: true, dbs: map[string]*driverDB{}}
  66. memDriverOnce sync.Once
  67. )
  68. // RegisterDriver registers a QL database/sql/driver[0] named "ql". The name
  69. // parameter of
  70. //
  71. // sql.Open("ql", name)
  72. //
  73. // is interpreted as a path name to a named DB file which will be created if
  74. // not present. The underlying QL database data are persisted on db.Close().
  75. // RegisterDriver can be safely called multiple times, it'll register the
  76. // driver only once.
  77. //
  78. // The name argument can be optionally prefixed by "file://". In that case the
  79. // prefix is stripped before interpreting it as a file name.
  80. //
  81. // The name argument can be optionally prefixed by "memory://". In that case
  82. // the prefix is stripped before interpreting it as a name of a memory-only,
  83. // volatile DB.
  84. //
  85. // [0]: http://golang.org/pkg/database/sql/driver/
  86. func RegisterDriver() {
  87. fileDriverOnce.Do(func() { sql.Register("ql", fileDriver) })
  88. }
  89. // RegisterMemDriver registers a QL memory database/sql/driver[0] named
  90. // "ql-mem". The name parameter of
  91. //
  92. // sql.Open("ql-mem", name)
  93. //
  94. // is interpreted as an unique memory DB name which will be created if not
  95. // present. The underlying QL memory database data are not persisted on
  96. // db.Close(). RegisterMemDriver can be safely called multiple times, it'll
  97. // register the driver only once.
  98. //
  99. // [0]: http://golang.org/pkg/database/sql/driver/
  100. func RegisterMemDriver() {
  101. memDriverOnce.Do(func() { sql.Register("ql-mem", memDriver) })
  102. }
  103. type driverDB struct {
  104. db *DB
  105. name string
  106. refcount int
  107. }
  108. func newDriverDB(db *DB, name string) *driverDB {
  109. return &driverDB{db: db, name: name, refcount: 1}
  110. }
  111. // sqlDriver implements the interface required by database/sql/driver.
  112. type sqlDriver struct {
  113. dbs map[string]*driverDB
  114. isMem bool
  115. mu sync.Mutex
  116. }
  117. func (d *sqlDriver) lock() func() {
  118. d.mu.Lock()
  119. return d.mu.Unlock
  120. }
  121. // Open returns a new connection to the database. The name is a string in a
  122. // driver-specific format.
  123. //
  124. // Open may return a cached connection (one previously closed), but doing so is
  125. // unnecessary; the sql package maintains a pool of idle connections for
  126. // efficient re-use.
  127. //
  128. // The returned connection is only used by one goroutine at a time.
  129. //
  130. // The name supported URL parameters:
  131. //
  132. // headroom Size of the WAL headroom. See https://github.com/cznic/ql/issues/140.
  133. func (d *sqlDriver) Open(name string) (driver.Conn, error) {
  134. switch {
  135. case d == fileDriver:
  136. if !strings.Contains(name, "://") && !strings.HasPrefix(name, "file") {
  137. name = "file://" + name
  138. }
  139. case d == memDriver:
  140. if !strings.Contains(name, "://") && !strings.HasPrefix(name, "memory") {
  141. name = "memory://" + name
  142. }
  143. default:
  144. return nil, fmt.Errorf("open: unexpected/unsupported instance of driver.Driver: %p", d)
  145. }
  146. name = filepath.ToSlash(name) // Ensure / separated URLs on Windows
  147. uri, err := url.Parse(name)
  148. if err != nil {
  149. return nil, err
  150. }
  151. switch uri.Scheme {
  152. case "file":
  153. // ok
  154. case "memory":
  155. d = memDriver
  156. default:
  157. return nil, fmt.Errorf("open: unexpected/unsupported scheme: %s", uri.Scheme)
  158. }
  159. name = filepath.Clean(filepath.Join(uri.Host, uri.Path))
  160. if d == fileDriver && (name == "" || name == "." || name == string(os.PathSeparator)) {
  161. return nil, fmt.Errorf("invalid DB name %q", name)
  162. }
  163. var headroom int64
  164. if a := uri.Query()["headroom"]; len(a) != 0 {
  165. if headroom, err = strconv.ParseInt(a[0], 10, 64); err != nil {
  166. return nil, err
  167. }
  168. }
  169. defer d.lock()()
  170. db := d.dbs[name]
  171. if db == nil {
  172. var err error
  173. var db0 *DB
  174. switch d.isMem {
  175. case true:
  176. db0, err = OpenMem()
  177. default:
  178. db0, err = OpenFile(name, &Options{CanCreate: true, Headroom: headroom})
  179. }
  180. if err != nil {
  181. return nil, err
  182. }
  183. db = newDriverDB(db0, name)
  184. d.dbs[name] = db
  185. return newDriverConn(d, db), nil
  186. }
  187. db.refcount++
  188. return newDriverConn(d, db), nil
  189. }
  190. // driverConn is a connection to a database. It is not used concurrently by
  191. // multiple goroutines.
  192. //
  193. // Conn is assumed to be stateful.
  194. type driverConn struct {
  195. ctx *TCtx
  196. db *driverDB
  197. driver *sqlDriver
  198. stop map[*driverStmt]struct{}
  199. tnl int
  200. }
  201. func newDriverConn(d *sqlDriver, ddb *driverDB) driver.Conn {
  202. r := &driverConn{
  203. db: ddb,
  204. driver: d,
  205. stop: map[*driverStmt]struct{}{},
  206. }
  207. return r
  208. }
  209. // Prepare returns a prepared statement, bound to this connection.
  210. func (c *driverConn) Prepare(query string) (driver.Stmt, error) {
  211. list, err := Compile(query)
  212. if err != nil {
  213. return nil, err
  214. }
  215. s := &driverStmt{conn: c, stmt: list}
  216. c.stop[s] = struct{}{}
  217. return s, nil
  218. }
  219. // Close invalidates and potentially stops any current prepared statements and
  220. // transactions, marking this connection as no longer in use.
  221. //
  222. // Because the sql package maintains a free pool of connections and only calls
  223. // Close when there's a surplus of idle connections, it shouldn't be necessary
  224. // for drivers to do their own connection caching.
  225. func (c *driverConn) Close() error {
  226. var err errList
  227. for s := range c.stop {
  228. err.append(s.Close())
  229. }
  230. defer c.driver.lock()()
  231. dbs, name := c.driver.dbs, c.db.name
  232. v := dbs[name]
  233. v.refcount--
  234. if v.refcount == 0 {
  235. err.append(c.db.db.Close())
  236. delete(dbs, name)
  237. }
  238. return err.error()
  239. }
  240. // Begin starts and returns a new transaction.
  241. func (c *driverConn) Begin() (driver.Tx, error) {
  242. if c.ctx == nil {
  243. c.ctx = NewRWCtx()
  244. }
  245. if _, _, err := c.db.db.Execute(c.ctx, txBegin); err != nil {
  246. return nil, err
  247. }
  248. c.tnl++
  249. return c, nil
  250. }
  251. func (c *driverConn) Commit() error {
  252. if c.tnl == 0 || c.ctx == nil {
  253. return errCommitNotInTransaction
  254. }
  255. if _, _, err := c.db.db.Execute(c.ctx, txCommit); err != nil {
  256. return err
  257. }
  258. c.tnl--
  259. if c.tnl == 0 {
  260. c.ctx = nil
  261. }
  262. return nil
  263. }
  264. func (c *driverConn) Rollback() error {
  265. if c.tnl == 0 || c.ctx == nil {
  266. return errRollbackNotInTransaction
  267. }
  268. if _, _, err := c.db.db.Execute(c.ctx, txRollback); err != nil {
  269. return err
  270. }
  271. c.tnl--
  272. if c.tnl == 0 {
  273. c.ctx = nil
  274. }
  275. return nil
  276. }
  277. // Execer is an optional interface that may be implemented by a Conn.
  278. //
  279. // If a Conn does not implement Execer, the sql package's DB.Exec will first
  280. // prepare a query, execute the statement, and then close the statement.
  281. //
  282. // Exec may return driver.ErrSkip.
  283. func (c *driverConn) Exec(query string, args []driver.Value) (driver.Result, error) {
  284. list, err := Compile(query)
  285. if err != nil {
  286. return nil, err
  287. }
  288. return driverExec(c.db, c.ctx, list, args)
  289. }
  290. func driverExec(db *driverDB, ctx *TCtx, list List, args []driver.Value) (driver.Result, error) {
  291. if _, _, err := db.db.Execute(ctx, list, params(args)...); err != nil {
  292. return nil, err
  293. }
  294. if len(list.l) == 1 {
  295. switch list.l[0].(type) {
  296. case *createTableStmt, *dropTableStmt, *alterTableAddStmt,
  297. *alterTableDropColumnStmt, *truncateTableStmt:
  298. return driver.ResultNoRows, nil
  299. }
  300. }
  301. r := &driverResult{}
  302. if ctx != nil {
  303. r.lastInsertID, r.rowsAffected = ctx.LastInsertID, ctx.RowsAffected
  304. }
  305. return r, nil
  306. }
  307. // Queryer is an optional interface that may be implemented by a Conn.
  308. //
  309. // If a Conn does not implement Queryer, the sql package's DB.Query will first
  310. // prepare a query, execute the statement, and then close the statement.
  311. //
  312. // Query may return driver.ErrSkip.
  313. func (c *driverConn) Query(query string, args []driver.Value) (driver.Rows, error) {
  314. list, err := Compile(query)
  315. if err != nil {
  316. return nil, err
  317. }
  318. return driverQuery(c.db, c.ctx, list, args)
  319. }
  320. func driverQuery(db *driverDB, ctx *TCtx, list List, args []driver.Value) (driver.Rows, error) {
  321. rss, _, err := db.db.Execute(ctx, list, params(args)...)
  322. if err != nil {
  323. return nil, err
  324. }
  325. switch n := len(rss); n {
  326. case 0:
  327. return nil, errNoResult
  328. case 1:
  329. return newdriverRows(rss[len(rss)-1]), nil
  330. default:
  331. return nil, fmt.Errorf("query produced %d result sets, expected only one", n)
  332. }
  333. }
  334. // driverResult is the result of a query execution.
  335. type driverResult struct {
  336. lastInsertID int64
  337. rowsAffected int64
  338. }
  339. // LastInsertId returns the database's auto-generated ID after, for example, an
  340. // INSERT into a table with primary key.
  341. func (r *driverResult) LastInsertId() (int64, error) { // -golint
  342. return r.lastInsertID, nil
  343. }
  344. // RowsAffected returns the number of rows affected by the query.
  345. func (r *driverResult) RowsAffected() (int64, error) {
  346. return r.rowsAffected, nil
  347. }
  348. // driverRows is an iterator over an executed query's results.
  349. type driverRows struct {
  350. rs Recordset
  351. done chan int
  352. rows chan interface{}
  353. }
  354. func newdriverRows(rs Recordset) *driverRows {
  355. r := &driverRows{
  356. rs: rs,
  357. done: make(chan int),
  358. rows: make(chan interface{}, 500),
  359. }
  360. go func() {
  361. err := io.EOF
  362. if e := r.rs.Do(false, func(data []interface{}) (bool, error) {
  363. select {
  364. case r.rows <- data:
  365. return true, nil
  366. case <-r.done:
  367. return false, nil
  368. }
  369. }); e != nil {
  370. err = e
  371. }
  372. select {
  373. case r.rows <- err:
  374. case <-r.done:
  375. }
  376. }()
  377. return r
  378. }
  379. // Columns returns the names of the columns. The number of columns of the
  380. // result is inferred from the length of the slice. If a particular column
  381. // name isn't known, an empty string should be returned for that entry.
  382. func (r *driverRows) Columns() []string {
  383. f, _ := r.rs.Fields()
  384. return f
  385. }
  386. // Close closes the rows iterator.
  387. func (r *driverRows) Close() error {
  388. close(r.done)
  389. return nil
  390. }
  391. // Next is called to populate the next row of data into the provided slice. The
  392. // provided slice will be the same size as the Columns() are wide.
  393. //
  394. // The dest slice may be populated only with a driver Value type, but excluding
  395. // string. All string values must be converted to []byte.
  396. //
  397. // Next should return io.EOF when there are no more rows.
  398. func (r *driverRows) Next(dest []driver.Value) error {
  399. select {
  400. case rx := <-r.rows:
  401. switch x := rx.(type) {
  402. case error:
  403. return x
  404. case []interface{}:
  405. if g, e := len(x), len(dest); g != e {
  406. return fmt.Errorf("field count mismatch: got %d, need %d", g, e)
  407. }
  408. for i, xi := range x {
  409. switch v := xi.(type) {
  410. case nil, int64, float64, bool, []byte, time.Time:
  411. dest[i] = v
  412. case complex64, complex128, *big.Int, *big.Rat:
  413. var buf bytes.Buffer
  414. fmt.Fprintf(&buf, "%v", v)
  415. dest[i] = buf.Bytes()
  416. case int8:
  417. dest[i] = int64(v)
  418. case int16:
  419. dest[i] = int64(v)
  420. case int32:
  421. dest[i] = int64(v)
  422. case int:
  423. dest[i] = int64(v)
  424. case uint8:
  425. dest[i] = int64(v)
  426. case uint16:
  427. dest[i] = int64(v)
  428. case uint32:
  429. dest[i] = int64(v)
  430. case uint64:
  431. dest[i] = int64(v)
  432. case uint:
  433. dest[i] = int64(v)
  434. case time.Duration:
  435. dest[i] = int64(v)
  436. case string:
  437. dest[i] = []byte(v)
  438. default:
  439. return fmt.Errorf("internal error 004")
  440. }
  441. }
  442. return nil
  443. default:
  444. return fmt.Errorf("internal error 005")
  445. }
  446. case <-r.done:
  447. return io.EOF
  448. }
  449. }
  450. // driverStmt is a prepared statement. It is bound to a driverConn and not used
  451. // by multiple goroutines concurrently.
  452. type driverStmt struct {
  453. conn *driverConn
  454. stmt List
  455. }
  456. // Close closes the statement.
  457. //
  458. // As of Go 1.1, a Stmt will not be closed if it's in use by any queries.
  459. func (s *driverStmt) Close() error {
  460. delete(s.conn.stop, s)
  461. return nil
  462. }
  463. // NumInput returns the number of placeholder parameters.
  464. //
  465. // If NumInput returns >= 0, the sql package will sanity check argument counts
  466. // from callers and return errors to the caller before the statement's Exec or
  467. // Query methods are called.
  468. //
  469. // NumInput may also return -1, if the driver doesn't know its number of
  470. // placeholders. In that case, the sql package will not sanity check Exec or
  471. // Query argument counts.
  472. func (s *driverStmt) NumInput() int {
  473. if x := s.stmt; len(x.l) == 1 {
  474. return x.params
  475. }
  476. return -1
  477. }
  478. // Exec executes a query that doesn't return rows, such as an INSERT or UPDATE.
  479. func (s *driverStmt) Exec(args []driver.Value) (driver.Result, error) {
  480. c := s.conn
  481. return driverExec(c.db, c.ctx, s.stmt, args)
  482. }
  483. // Exec executes a query that may return rows, such as a SELECT.
  484. func (s *driverStmt) Query(args []driver.Value) (driver.Rows, error) {
  485. c := s.conn
  486. return driverQuery(c.db, c.ctx, s.stmt, args)
  487. }