bolt.go 15 KB


  1. // +build !nobolt
  2. package dataprovider
  3. import (
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "path/filepath"
  9. "time"
  10. bolt "go.etcd.io/bbolt"
  11. "github.com/drakkan/sftpgo/logger"
  12. "github.com/drakkan/sftpgo/utils"
  13. )
  14. const (
  15. boltDatabaseVersion = 3
  16. )
  17. var (
  18. usersBucket = []byte("users")
  19. usersIDIdxBucket = []byte("users_id_idx")
  20. dbVersionBucket = []byte("db_version")
  21. dbVersionKey = []byte("version")
  22. )
  23. // BoltProvider auth provider for bolt key/value store
  24. type BoltProvider struct {
  25. dbHandle *bolt.DB
  26. }
  27. type compatUserV2 struct {
  28. ID int64 `json:"id"`
  29. Username string `json:"username"`
  30. Password string `json:"password,omitempty"`
  31. PublicKeys []string `json:"public_keys,omitempty"`
  32. HomeDir string `json:"home_dir"`
  33. UID int `json:"uid"`
  34. GID int `json:"gid"`
  35. MaxSessions int `json:"max_sessions"`
  36. QuotaSize int64 `json:"quota_size"`
  37. QuotaFiles int `json:"quota_files"`
  38. Permissions []string `json:"permissions"`
  39. UsedQuotaSize int64 `json:"used_quota_size"`
  40. UsedQuotaFiles int `json:"used_quota_files"`
  41. LastQuotaUpdate int64 `json:"last_quota_update"`
  42. UploadBandwidth int64 `json:"upload_bandwidth"`
  43. DownloadBandwidth int64 `json:"download_bandwidth"`
  44. ExpirationDate int64 `json:"expiration_date"`
  45. LastLogin int64 `json:"last_login"`
  46. Status int `json:"status"`
  47. }
  48. func init() {
  49. utils.AddFeature("+bolt")
  50. }
  51. func initializeBoltProvider(basePath string) error {
  52. var err error
  53. logSender = fmt.Sprintf("dataprovider_%v", BoltDataProviderName)
  54. dbPath := config.Name
  55. if !utils.IsFileInputValid(dbPath) {
  56. return fmt.Errorf("Invalid database path: %#v", dbPath)
  57. }
  58. if !filepath.IsAbs(dbPath) {
  59. dbPath = filepath.Join(basePath, dbPath)
  60. }
  61. dbHandle, err := bolt.Open(dbPath, 0600, &bolt.Options{
  62. NoGrowSync: false,
  63. FreelistType: bolt.FreelistArrayType,
  64. Timeout: 5 * time.Second})
  65. if err == nil {
  66. providerLog(logger.LevelDebug, "bolt key store handle created")
  67. err = dbHandle.Update(func(tx *bolt.Tx) error {
  68. _, e := tx.CreateBucketIfNotExists(usersBucket)
  69. return e
  70. })
  71. if err != nil {
  72. providerLog(logger.LevelWarn, "error creating users bucket: %v", err)
  73. return err
  74. }
  75. err = dbHandle.Update(func(tx *bolt.Tx) error {
  76. _, e := tx.CreateBucketIfNotExists(usersIDIdxBucket)
  77. return e
  78. })
  79. if err != nil {
  80. providerLog(logger.LevelWarn, "error creating username idx bucket: %v", err)
  81. return err
  82. }
  83. err = dbHandle.Update(func(tx *bolt.Tx) error {
  84. _, e := tx.CreateBucketIfNotExists(dbVersionBucket)
  85. return e
  86. })
  87. if err != nil {
  88. providerLog(logger.LevelWarn, "error creating database version bucket: %v", err)
  89. return err
  90. }
  91. provider = BoltProvider{dbHandle: dbHandle}
  92. } else {
  93. providerLog(logger.LevelWarn, "error creating bolt key/value store handler: %v", err)
  94. }
  95. return err
  96. }
  97. func (p BoltProvider) checkAvailability() error {
  98. _, err := p.getUsers(1, 0, "ASC", "")
  99. return err
  100. }
  101. func (p BoltProvider) validateUserAndPass(username string, password string) (User, error) {
  102. var user User
  103. if len(password) == 0 {
  104. return user, errors.New("Credentials cannot be null or empty")
  105. }
  106. user, err := p.userExists(username)
  107. if err != nil {
  108. providerLog(logger.LevelWarn, "error authenticating user: %v, error: %v", username, err)
  109. return user, err
  110. }
  111. return checkUserAndPass(user, password)
  112. }
  113. func (p BoltProvider) validateUserAndPubKey(username string, pubKey []byte) (User, string, error) {
  114. var user User
  115. if len(pubKey) == 0 {
  116. return user, "", errors.New("Credentials cannot be null or empty")
  117. }
  118. user, err := p.userExists(username)
  119. if err != nil {
  120. providerLog(logger.LevelWarn, "error authenticating user: %v, error: %v", username, err)
  121. return user, "", err
  122. }
  123. return checkUserAndPubKey(user, pubKey)
  124. }
  125. func (p BoltProvider) getUserByID(ID int64) (User, error) {
  126. var user User
  127. err := p.dbHandle.View(func(tx *bolt.Tx) error {
  128. bucket, idxBucket, err := getBuckets(tx)
  129. if err != nil {
  130. return err
  131. }
  132. userIDAsBytes := itob(ID)
  133. username := idxBucket.Get(userIDAsBytes)
  134. if username == nil {
  135. return &RecordNotFoundError{err: fmt.Sprintf("user with ID %v does not exist", ID)}
  136. }
  137. u := bucket.Get(username)
  138. if u == nil {
  139. return &RecordNotFoundError{err: fmt.Sprintf("username %#v and ID: %v does not exist", string(username), ID)}
  140. }
  141. return json.Unmarshal(u, &user)
  142. })
  143. return user, err
  144. }
  145. func (p BoltProvider) updateLastLogin(username string) error {
  146. return p.dbHandle.Update(func(tx *bolt.Tx) error {
  147. bucket, _, err := getBuckets(tx)
  148. if err != nil {
  149. return err
  150. }
  151. var u []byte
  152. if u = bucket.Get([]byte(username)); u == nil {
  153. return &RecordNotFoundError{err: fmt.Sprintf("username %#v does not exist, unable to update last login", username)}
  154. }
  155. var user User
  156. err = json.Unmarshal(u, &user)
  157. if err != nil {
  158. return err
  159. }
  160. user.LastLogin = utils.GetTimeAsMsSinceEpoch(time.Now())
  161. buf, err := json.Marshal(user)
  162. if err != nil {
  163. return err
  164. }
  165. err = bucket.Put([]byte(username), buf)
  166. if err == nil {
  167. providerLog(logger.LevelDebug, "last login updated for user %#v", username)
  168. } else {
  169. providerLog(logger.LevelWarn, "error updating last login for user %#v: %v", username, err)
  170. }
  171. return err
  172. })
  173. }
  174. func (p BoltProvider) updateQuota(username string, filesAdd int, sizeAdd int64, reset bool) error {
  175. return p.dbHandle.Update(func(tx *bolt.Tx) error {
  176. bucket, _, err := getBuckets(tx)
  177. if err != nil {
  178. return err
  179. }
  180. var u []byte
  181. if u = bucket.Get([]byte(username)); u == nil {
  182. return &RecordNotFoundError{err: fmt.Sprintf("username %#v does not exist, unable to update quota", username)}
  183. }
  184. var user User
  185. err = json.Unmarshal(u, &user)
  186. if err != nil {
  187. return err
  188. }
  189. if reset {
  190. user.UsedQuotaSize = sizeAdd
  191. user.UsedQuotaFiles = filesAdd
  192. } else {
  193. user.UsedQuotaSize += sizeAdd
  194. user.UsedQuotaFiles += filesAdd
  195. }
  196. user.LastQuotaUpdate = utils.GetTimeAsMsSinceEpoch(time.Now())
  197. buf, err := json.Marshal(user)
  198. if err != nil {
  199. return err
  200. }
  201. return bucket.Put([]byte(username), buf)
  202. })
  203. }
  204. func (p BoltProvider) getUsedQuota(username string) (int, int64, error) {
  205. user, err := p.userExists(username)
  206. if err != nil {
  207. providerLog(logger.LevelWarn, "unable to get quota for user %v error: %v", username, err)
  208. return 0, 0, err
  209. }
  210. return user.UsedQuotaFiles, user.UsedQuotaSize, err
  211. }
  212. func (p BoltProvider) userExists(username string) (User, error) {
  213. var user User
  214. err := p.dbHandle.View(func(tx *bolt.Tx) error {
  215. bucket, _, err := getBuckets(tx)
  216. if err != nil {
  217. return err
  218. }
  219. u := bucket.Get([]byte(username))
  220. if u == nil {
  221. return &RecordNotFoundError{err: fmt.Sprintf("username %v does not exist", username)}
  222. }
  223. return json.Unmarshal(u, &user)
  224. })
  225. return user, err
  226. }
  227. func (p BoltProvider) addUser(user User) error {
  228. err := validateUser(&user)
  229. if err != nil {
  230. return err
  231. }
  232. return p.dbHandle.Update(func(tx *bolt.Tx) error {
  233. bucket, idxBucket, err := getBuckets(tx)
  234. if err != nil {
  235. return err
  236. }
  237. if u := bucket.Get([]byte(user.Username)); u != nil {
  238. return fmt.Errorf("username %v already exists", user.Username)
  239. }
  240. id, err := bucket.NextSequence()
  241. if err != nil {
  242. return err
  243. }
  244. user.ID = int64(id)
  245. buf, err := json.Marshal(user)
  246. if err != nil {
  247. return err
  248. }
  249. userIDAsBytes := itob(user.ID)
  250. err = bucket.Put([]byte(user.Username), buf)
  251. if err != nil {
  252. return err
  253. }
  254. return idxBucket.Put(userIDAsBytes, []byte(user.Username))
  255. })
  256. }
  257. func (p BoltProvider) updateUser(user User) error {
  258. err := validateUser(&user)
  259. if err != nil {
  260. return err
  261. }
  262. return p.dbHandle.Update(func(tx *bolt.Tx) error {
  263. bucket, _, err := getBuckets(tx)
  264. if err != nil {
  265. return err
  266. }
  267. if u := bucket.Get([]byte(user.Username)); u == nil {
  268. return &RecordNotFoundError{err: fmt.Sprintf("username %v does not exist", user.Username)}
  269. }
  270. buf, err := json.Marshal(user)
  271. if err != nil {
  272. return err
  273. }
  274. return bucket.Put([]byte(user.Username), buf)
  275. })
  276. }
  277. func (p BoltProvider) deleteUser(user User) error {
  278. return p.dbHandle.Update(func(tx *bolt.Tx) error {
  279. bucket, idxBucket, err := getBuckets(tx)
  280. if err != nil {
  281. return err
  282. }
  283. userIDAsBytes := itob(user.ID)
  284. userName := idxBucket.Get(userIDAsBytes)
  285. if userName == nil {
  286. return &RecordNotFoundError{err: fmt.Sprintf("user with id %v does not exist", user.ID)}
  287. }
  288. err = bucket.Delete(userName)
  289. if err != nil {
  290. return err
  291. }
  292. return idxBucket.Delete(userIDAsBytes)
  293. })
  294. }
  295. func (p BoltProvider) dumpUsers() ([]User, error) {
  296. users := []User{}
  297. err := p.dbHandle.View(func(tx *bolt.Tx) error {
  298. bucket, _, err := getBuckets(tx)
  299. if err != nil {
  300. return err
  301. }
  302. cursor := bucket.Cursor()
  303. for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
  304. var user User
  305. err = json.Unmarshal(v, &user)
  306. if err != nil {
  307. return err
  308. }
  309. err = addCredentialsToUser(&user)
  310. if err != nil {
  311. return err
  312. }
  313. users = append(users, user)
  314. }
  315. return err
  316. })
  317. return users, err
  318. }
  319. func (p BoltProvider) getUserWithUsername(username string) ([]User, error) {
  320. users := []User{}
  321. var user User
  322. user, err := p.userExists(username)
  323. if err == nil {
  324. users = append(users, HideUserSensitiveData(&user))
  325. return users, nil
  326. }
  327. if _, ok := err.(*RecordNotFoundError); ok {
  328. err = nil
  329. }
  330. return users, err
  331. }
  332. func (p BoltProvider) getUsers(limit int, offset int, order string, username string) ([]User, error) {
  333. users := []User{}
  334. var err error
  335. if limit <= 0 {
  336. return users, err
  337. }
  338. if len(username) > 0 {
  339. if offset == 0 {
  340. return p.getUserWithUsername(username)
  341. }
  342. return users, err
  343. }
  344. err = p.dbHandle.View(func(tx *bolt.Tx) error {
  345. bucket, _, err := getBuckets(tx)
  346. if err != nil {
  347. return err
  348. }
  349. cursor := bucket.Cursor()
  350. itNum := 0
  351. if order == "ASC" {
  352. for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
  353. itNum++
  354. if itNum <= offset {
  355. continue
  356. }
  357. var user User
  358. err = json.Unmarshal(v, &user)
  359. if err == nil {
  360. users = append(users, HideUserSensitiveData(&user))
  361. }
  362. if len(users) >= limit {
  363. break
  364. }
  365. }
  366. } else {
  367. for k, v := cursor.Last(); k != nil; k, v = cursor.Prev() {
  368. itNum++
  369. if itNum <= offset {
  370. continue
  371. }
  372. var user User
  373. err = json.Unmarshal(v, &user)
  374. if err == nil {
  375. users = append(users, HideUserSensitiveData(&user))
  376. }
  377. if len(users) >= limit {
  378. break
  379. }
  380. }
  381. }
  382. return err
  383. })
  384. return users, err
  385. }
  386. func (p BoltProvider) close() error {
  387. return p.dbHandle.Close()
  388. }
  389. func (p BoltProvider) reloadConfig() error {
  390. return nil
  391. }
  392. // initializeDatabase does nothing, no initilization is needed for bolt provider
  393. func (p BoltProvider) initializeDatabase() error {
  394. return errNoInitRequired
  395. }
  396. func (p BoltProvider) migrateDatabase() error {
  397. dbVersion, err := getBoltDatabaseVersion(p.dbHandle)
  398. if err != nil {
  399. return err
  400. }
  401. if dbVersion.Version == boltDatabaseVersion {
  402. providerLog(logger.LevelDebug, "bolt database is updated, current version: %v", dbVersion.Version)
  403. return nil
  404. }
  405. switch dbVersion.Version {
  406. case 1:
  407. err = updateDatabaseFrom1To2(p.dbHandle)
  408. if err != nil {
  409. return err
  410. }
  411. return updateDatabaseFrom2To3(p.dbHandle)
  412. case 2:
  413. return updateDatabaseFrom2To3(p.dbHandle)
  414. default:
  415. return fmt.Errorf("Database version not handled: %v", dbVersion.Version)
  416. }
  417. }
  418. // itob returns an 8-byte big endian representation of v.
  419. func itob(v int64) []byte {
  420. b := make([]byte, 8)
  421. binary.BigEndian.PutUint64(b, uint64(v))
  422. return b
  423. }
  424. func getBuckets(tx *bolt.Tx) (*bolt.Bucket, *bolt.Bucket, error) {
  425. var err error
  426. bucket := tx.Bucket(usersBucket)
  427. idxBucket := tx.Bucket(usersIDIdxBucket)
  428. if bucket == nil || idxBucket == nil {
  429. err = fmt.Errorf("unable to find required buckets, bolt database structure not correcly defined")
  430. }
  431. return bucket, idxBucket, err
  432. }
  433. func updateDatabaseFrom1To2(dbHandle *bolt.DB) error {
  434. providerLog(logger.LevelInfo, "updating bolt database version: 1 -> 2")
  435. usernames, err := getBoltAvailableUsernames(dbHandle)
  436. if err != nil {
  437. return err
  438. }
  439. for _, u := range usernames {
  440. user, err := provider.userExists(u)
  441. if err != nil {
  442. return err
  443. }
  444. user.Status = 1
  445. err = provider.updateUser(user)
  446. if err != nil {
  447. return err
  448. }
  449. providerLog(logger.LevelInfo, "user %#v updated, \"status\" setted to 1", user.Username)
  450. }
  451. return updateBoltDatabaseVersion(dbHandle, 2)
  452. }
  453. func updateDatabaseFrom2To3(dbHandle *bolt.DB) error {
  454. providerLog(logger.LevelInfo, "updating bolt database version: 2 -> 3")
  455. users := []User{}
  456. err := dbHandle.View(func(tx *bolt.Tx) error {
  457. bucket, _, err := getBuckets(tx)
  458. if err != nil {
  459. return err
  460. }
  461. cursor := bucket.Cursor()
  462. for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
  463. var compatUser compatUserV2
  464. err = json.Unmarshal(v, &compatUser)
  465. if err == nil {
  466. user := User{}
  467. user.ID = compatUser.ID
  468. user.Username = compatUser.Username
  469. user.Password = compatUser.Password
  470. user.PublicKeys = compatUser.PublicKeys
  471. user.HomeDir = compatUser.HomeDir
  472. user.UID = compatUser.UID
  473. user.GID = compatUser.GID
  474. user.MaxSessions = compatUser.MaxSessions
  475. user.QuotaSize = compatUser.QuotaSize
  476. user.QuotaFiles = compatUser.QuotaFiles
  477. user.Permissions = make(map[string][]string)
  478. user.Permissions["/"] = compatUser.Permissions
  479. user.UsedQuotaSize = compatUser.UsedQuotaSize
  480. user.UsedQuotaFiles = compatUser.UsedQuotaFiles
  481. user.LastQuotaUpdate = compatUser.LastQuotaUpdate
  482. user.UploadBandwidth = compatUser.UploadBandwidth
  483. user.DownloadBandwidth = compatUser.DownloadBandwidth
  484. user.ExpirationDate = compatUser.ExpirationDate
  485. user.LastLogin = compatUser.LastLogin
  486. user.Status = compatUser.Status
  487. users = append(users, user)
  488. }
  489. }
  490. return err
  491. })
  492. if err != nil {
  493. return err
  494. }
  495. for _, user := range users {
  496. err = provider.updateUser(user)
  497. if err != nil {
  498. return err
  499. }
  500. providerLog(logger.LevelInfo, "user %#v updated, \"permissions\" setted to %+v", user.Username, user.Permissions)
  501. }
  502. return updateBoltDatabaseVersion(dbHandle, 3)
  503. }
  504. func getBoltAvailableUsernames(dbHandle *bolt.DB) ([]string, error) {
  505. usernames := []string{}
  506. err := dbHandle.View(func(tx *bolt.Tx) error {
  507. _, idxBucket, err := getBuckets(tx)
  508. if err != nil {
  509. return err
  510. }
  511. cursor := idxBucket.Cursor()
  512. for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
  513. usernames = append(usernames, string(v))
  514. }
  515. return nil
  516. })
  517. return usernames, err
  518. }
  519. func getBoltDatabaseVersion(dbHandle *bolt.DB) (schemaVersion, error) {
  520. var dbVersion schemaVersion
  521. err := dbHandle.View(func(tx *bolt.Tx) error {
  522. bucket := tx.Bucket(dbVersionBucket)
  523. if bucket == nil {
  524. return fmt.Errorf("unable to find database version bucket")
  525. }
  526. v := bucket.Get(dbVersionKey)
  527. if v == nil {
  528. dbVersion = schemaVersion{
  529. Version: 1,
  530. }
  531. return nil
  532. }
  533. return json.Unmarshal(v, &dbVersion)
  534. })
  535. return dbVersion, err
  536. }
  537. func updateBoltDatabaseVersion(dbHandle *bolt.DB, version int) error {
  538. err := dbHandle.Update(func(tx *bolt.Tx) error {
  539. bucket := tx.Bucket(dbVersionBucket)
  540. if bucket == nil {
  541. return fmt.Errorf("unable to find database version bucket")
  542. }
  543. newDbVersion := schemaVersion{
  544. Version: version,
  545. }
  546. buf, err := json.Marshal(newDbVersion)
  547. if err != nil {
  548. return err
  549. }
  550. return bucket.Put(dbVersionKey, buf)
  551. })
  552. return err
  553. }