bolt.go 15 KB


  1. package dataprovider
  2. import (
  3. "encoding/binary"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "path/filepath"
  8. "time"
  9. "github.com/drakkan/sftpgo/logger"
  10. "github.com/drakkan/sftpgo/utils"
  11. bolt "go.etcd.io/bbolt"
  12. )
  13. const (
  14. boltDatabaseVersion = 3
  15. )
  16. var (
  17. usersBucket = []byte("users")
  18. usersIDIdxBucket = []byte("users_id_idx")
  19. dbVersionBucket = []byte("db_version")
  20. dbVersionKey = []byte("version")
  21. )
  22. // BoltProvider auth provider for bolt key/value store
  23. type BoltProvider struct {
  24. dbHandle *bolt.DB
  25. }
  26. type compatUserV2 struct {
  27. ID int64 `json:"id"`
  28. Username string `json:"username"`
  29. Password string `json:"password,omitempty"`
  30. PublicKeys []string `json:"public_keys,omitempty"`
  31. HomeDir string `json:"home_dir"`
  32. UID int `json:"uid"`
  33. GID int `json:"gid"`
  34. MaxSessions int `json:"max_sessions"`
  35. QuotaSize int64 `json:"quota_size"`
  36. QuotaFiles int `json:"quota_files"`
  37. Permissions []string `json:"permissions"`
  38. UsedQuotaSize int64 `json:"used_quota_size"`
  39. UsedQuotaFiles int `json:"used_quota_files"`
  40. LastQuotaUpdate int64 `json:"last_quota_update"`
  41. UploadBandwidth int64 `json:"upload_bandwidth"`
  42. DownloadBandwidth int64 `json:"download_bandwidth"`
  43. ExpirationDate int64 `json:"expiration_date"`
  44. LastLogin int64 `json:"last_login"`
  45. Status int `json:"status"`
  46. }
  47. func initializeBoltProvider(basePath string) error {
  48. var err error
  49. logSender = BoltDataProviderName
  50. dbPath := config.Name
  51. if !utils.IsFileInputValid(dbPath) {
  52. return fmt.Errorf("Invalid database path: %#v", dbPath)
  53. }
  54. if !filepath.IsAbs(dbPath) {
  55. dbPath = filepath.Join(basePath, dbPath)
  56. }
  57. dbHandle, err := bolt.Open(dbPath, 0600, &bolt.Options{
  58. NoGrowSync: false,
  59. FreelistType: bolt.FreelistArrayType,
  60. Timeout: 5 * time.Second})
  61. if err == nil {
  62. providerLog(logger.LevelDebug, "bolt key store handle created")
  63. err = dbHandle.Update(func(tx *bolt.Tx) error {
  64. _, e := tx.CreateBucketIfNotExists(usersBucket)
  65. return e
  66. })
  67. if err != nil {
  68. providerLog(logger.LevelWarn, "error creating users bucket: %v", err)
  69. return err
  70. }
  71. err = dbHandle.Update(func(tx *bolt.Tx) error {
  72. _, e := tx.CreateBucketIfNotExists(usersIDIdxBucket)
  73. return e
  74. })
  75. if err != nil {
  76. providerLog(logger.LevelWarn, "error creating username idx bucket: %v", err)
  77. return err
  78. }
  79. err = dbHandle.Update(func(tx *bolt.Tx) error {
  80. _, e := tx.CreateBucketIfNotExists(dbVersionBucket)
  81. return e
  82. })
  83. if err != nil {
  84. providerLog(logger.LevelWarn, "error creating database version bucket: %v", err)
  85. return err
  86. }
  87. provider = BoltProvider{dbHandle: dbHandle}
  88. } else {
  89. providerLog(logger.LevelWarn, "error creating bolt key/value store handler: %v", err)
  90. }
  91. return err
  92. }
  93. func (p BoltProvider) checkAvailability() error {
  94. _, err := p.getUsers(1, 0, "ASC", "")
  95. return err
  96. }
  97. func (p BoltProvider) validateUserAndPass(username string, password string) (User, error) {
  98. var user User
  99. if len(password) == 0 {
  100. return user, errors.New("Credentials cannot be null or empty")
  101. }
  102. user, err := p.userExists(username)
  103. if err != nil {
  104. providerLog(logger.LevelWarn, "error authenticating user: %v, error: %v", username, err)
  105. return user, err
  106. }
  107. return checkUserAndPass(user, password)
  108. }
  109. func (p BoltProvider) validateUserAndPubKey(username string, pubKey string) (User, string, error) {
  110. var user User
  111. if len(pubKey) == 0 {
  112. return user, "", errors.New("Credentials cannot be null or empty")
  113. }
  114. user, err := p.userExists(username)
  115. if err != nil {
  116. providerLog(logger.LevelWarn, "error authenticating user: %v, error: %v", username, err)
  117. return user, "", err
  118. }
  119. return checkUserAndPubKey(user, pubKey)
  120. }
  121. func (p BoltProvider) getUserByID(ID int64) (User, error) {
  122. var user User
  123. err := p.dbHandle.View(func(tx *bolt.Tx) error {
  124. bucket, idxBucket, err := getBuckets(tx)
  125. if err != nil {
  126. return err
  127. }
  128. userIDAsBytes := itob(ID)
  129. username := idxBucket.Get(userIDAsBytes)
  130. if username == nil {
  131. return &RecordNotFoundError{err: fmt.Sprintf("user with ID %v does not exist", ID)}
  132. }
  133. u := bucket.Get(username)
  134. if u == nil {
  135. return &RecordNotFoundError{err: fmt.Sprintf("username %#v and ID: %v does not exist", string(username), ID)}
  136. }
  137. return json.Unmarshal(u, &user)
  138. })
  139. return user, err
  140. }
  141. func (p BoltProvider) updateLastLogin(username string) error {
  142. return p.dbHandle.Update(func(tx *bolt.Tx) error {
  143. bucket, _, err := getBuckets(tx)
  144. if err != nil {
  145. return err
  146. }
  147. var u []byte
  148. if u = bucket.Get([]byte(username)); u == nil {
  149. return &RecordNotFoundError{err: fmt.Sprintf("username %#v does not exist, unable to update last login", username)}
  150. }
  151. var user User
  152. err = json.Unmarshal(u, &user)
  153. if err != nil {
  154. return err
  155. }
  156. user.LastLogin = utils.GetTimeAsMsSinceEpoch(time.Now())
  157. buf, err := json.Marshal(user)
  158. if err != nil {
  159. return err
  160. }
  161. return bucket.Put([]byte(username), buf)
  162. })
  163. }
  164. func (p BoltProvider) updateQuota(username string, filesAdd int, sizeAdd int64, reset bool) error {
  165. return p.dbHandle.Update(func(tx *bolt.Tx) error {
  166. bucket, _, err := getBuckets(tx)
  167. if err != nil {
  168. return err
  169. }
  170. var u []byte
  171. if u = bucket.Get([]byte(username)); u == nil {
  172. return &RecordNotFoundError{err: fmt.Sprintf("username %#v does not exist, unable to update quota", username)}
  173. }
  174. var user User
  175. err = json.Unmarshal(u, &user)
  176. if err != nil {
  177. return err
  178. }
  179. if reset {
  180. user.UsedQuotaSize = sizeAdd
  181. user.UsedQuotaFiles = filesAdd
  182. } else {
  183. user.UsedQuotaSize += sizeAdd
  184. user.UsedQuotaFiles += filesAdd
  185. }
  186. user.LastQuotaUpdate = utils.GetTimeAsMsSinceEpoch(time.Now())
  187. buf, err := json.Marshal(user)
  188. if err != nil {
  189. return err
  190. }
  191. return bucket.Put([]byte(username), buf)
  192. })
  193. }
  194. func (p BoltProvider) getUsedQuota(username string) (int, int64, error) {
  195. user, err := p.userExists(username)
  196. if err != nil {
  197. providerLog(logger.LevelWarn, "unable to get quota for user %v error: %v", username, err)
  198. return 0, 0, err
  199. }
  200. return user.UsedQuotaFiles, user.UsedQuotaSize, err
  201. }
  202. func (p BoltProvider) userExists(username string) (User, error) {
  203. var user User
  204. err := p.dbHandle.View(func(tx *bolt.Tx) error {
  205. bucket, _, err := getBuckets(tx)
  206. if err != nil {
  207. return err
  208. }
  209. u := bucket.Get([]byte(username))
  210. if u == nil {
  211. return &RecordNotFoundError{err: fmt.Sprintf("username %v does not exist", username)}
  212. }
  213. return json.Unmarshal(u, &user)
  214. })
  215. return user, err
  216. }
  217. func (p BoltProvider) addUser(user User) error {
  218. err := validateUser(&user)
  219. if err != nil {
  220. return err
  221. }
  222. return p.dbHandle.Update(func(tx *bolt.Tx) error {
  223. bucket, idxBucket, err := getBuckets(tx)
  224. if err != nil {
  225. return err
  226. }
  227. if u := bucket.Get([]byte(user.Username)); u != nil {
  228. return fmt.Errorf("username %v already exists", user.Username)
  229. }
  230. id, err := bucket.NextSequence()
  231. if err != nil {
  232. return err
  233. }
  234. user.ID = int64(id)
  235. buf, err := json.Marshal(user)
  236. if err != nil {
  237. return err
  238. }
  239. userIDAsBytes := itob(user.ID)
  240. err = bucket.Put([]byte(user.Username), buf)
  241. if err != nil {
  242. return err
  243. }
  244. return idxBucket.Put(userIDAsBytes, []byte(user.Username))
  245. })
  246. }
  247. func (p BoltProvider) updateUser(user User) error {
  248. err := validateUser(&user)
  249. if err != nil {
  250. return err
  251. }
  252. return p.dbHandle.Update(func(tx *bolt.Tx) error {
  253. bucket, _, err := getBuckets(tx)
  254. if err != nil {
  255. return err
  256. }
  257. if u := bucket.Get([]byte(user.Username)); u == nil {
  258. return &RecordNotFoundError{err: fmt.Sprintf("username %v does not exist", user.Username)}
  259. }
  260. buf, err := json.Marshal(user)
  261. if err != nil {
  262. return err
  263. }
  264. return bucket.Put([]byte(user.Username), buf)
  265. })
  266. }
  267. func (p BoltProvider) deleteUser(user User) error {
  268. return p.dbHandle.Update(func(tx *bolt.Tx) error {
  269. bucket, idxBucket, err := getBuckets(tx)
  270. if err != nil {
  271. return err
  272. }
  273. userIDAsBytes := itob(user.ID)
  274. userName := idxBucket.Get(userIDAsBytes)
  275. if userName == nil {
  276. return &RecordNotFoundError{err: fmt.Sprintf("user with id %v does not exist", user.ID)}
  277. }
  278. err = bucket.Delete(userName)
  279. if err != nil {
  280. return err
  281. }
  282. return idxBucket.Delete(userIDAsBytes)
  283. })
  284. }
  285. func (p BoltProvider) dumpUsers() ([]User, error) {
  286. users := []User{}
  287. var err error
  288. err = p.dbHandle.View(func(tx *bolt.Tx) error {
  289. bucket, _, err := getBuckets(tx)
  290. if err != nil {
  291. return err
  292. }
  293. cursor := bucket.Cursor()
  294. for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
  295. var user User
  296. err = json.Unmarshal(v, &user)
  297. if err != nil {
  298. return err
  299. }
  300. err = addCredentialsToUser(&user)
  301. if err != nil {
  302. return err
  303. }
  304. users = append(users, user)
  305. }
  306. return err
  307. })
  308. return users, err
  309. }
  310. func (p BoltProvider) getUserWithUsername(username string) ([]User, error) {
  311. users := []User{}
  312. var user User
  313. user, err := p.userExists(username)
  314. if err == nil {
  315. users = append(users, HideUserSensitiveData(&user))
  316. return users, nil
  317. }
  318. if _, ok := err.(*RecordNotFoundError); ok {
  319. err = nil
  320. }
  321. return users, err
  322. }
  323. func (p BoltProvider) getUsers(limit int, offset int, order string, username string) ([]User, error) {
  324. users := []User{}
  325. var err error
  326. if limit <= 0 {
  327. return users, err
  328. }
  329. if len(username) > 0 {
  330. if offset == 0 {
  331. return p.getUserWithUsername(username)
  332. }
  333. return users, err
  334. }
  335. err = p.dbHandle.View(func(tx *bolt.Tx) error {
  336. bucket, _, err := getBuckets(tx)
  337. if err != nil {
  338. return err
  339. }
  340. cursor := bucket.Cursor()
  341. itNum := 0
  342. if order == "ASC" {
  343. for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
  344. itNum++
  345. if itNum <= offset {
  346. continue
  347. }
  348. var user User
  349. err = json.Unmarshal(v, &user)
  350. if err == nil {
  351. users = append(users, HideUserSensitiveData(&user))
  352. }
  353. if len(users) >= limit {
  354. break
  355. }
  356. }
  357. } else {
  358. for k, v := cursor.Last(); k != nil; k, v = cursor.Prev() {
  359. itNum++
  360. if itNum <= offset {
  361. continue
  362. }
  363. var user User
  364. err = json.Unmarshal(v, &user)
  365. if err == nil {
  366. users = append(users, HideUserSensitiveData(&user))
  367. }
  368. if len(users) >= limit {
  369. break
  370. }
  371. }
  372. }
  373. return err
  374. })
  375. return users, err
  376. }
  377. func (p BoltProvider) close() error {
  378. return p.dbHandle.Close()
  379. }
  380. func (p BoltProvider) reloadConfig() error {
  381. return nil
  382. }
  383. // initializeDatabase does nothing, no initilization is needed for bolt provider
  384. func (p BoltProvider) initializeDatabase() error {
  385. return errNoInitRequired
  386. }
  387. func (p BoltProvider) migrateDatabase() error {
  388. dbVersion, err := getBoltDatabaseVersion(p.dbHandle)
  389. if err != nil {
  390. return err
  391. }
  392. if dbVersion.Version == boltDatabaseVersion {
  393. providerLog(logger.LevelDebug, "bolt database is updated, current version: %v", dbVersion.Version)
  394. return nil
  395. }
  396. if dbVersion.Version == 1 {
  397. err = updateDatabaseFrom1To2(p.dbHandle)
  398. if err != nil {
  399. return err
  400. }
  401. return updateDatabaseFrom2To3(p.dbHandle)
  402. } else if dbVersion.Version == 2 {
  403. return updateDatabaseFrom2To3(p.dbHandle)
  404. }
  405. return nil
  406. }
  407. // itob returns an 8-byte big endian representation of v.
  408. func itob(v int64) []byte {
  409. b := make([]byte, 8)
  410. binary.BigEndian.PutUint64(b, uint64(v))
  411. return b
  412. }
  413. func getBuckets(tx *bolt.Tx) (*bolt.Bucket, *bolt.Bucket, error) {
  414. var err error
  415. bucket := tx.Bucket(usersBucket)
  416. idxBucket := tx.Bucket(usersIDIdxBucket)
  417. if bucket == nil || idxBucket == nil {
  418. err = fmt.Errorf("unable to find required buckets, bolt database structure not correcly defined")
  419. }
  420. return bucket, idxBucket, err
  421. }
  422. func updateDatabaseFrom1To2(dbHandle *bolt.DB) error {
  423. providerLog(logger.LevelInfo, "updating bolt database version: 1 -> 2")
  424. usernames, err := getBoltAvailableUsernames(dbHandle)
  425. if err != nil {
  426. return err
  427. }
  428. for _, u := range usernames {
  429. user, err := provider.userExists(u)
  430. if err != nil {
  431. return err
  432. }
  433. user.Status = 1
  434. err = provider.updateUser(user)
  435. if err != nil {
  436. return err
  437. }
  438. providerLog(logger.LevelInfo, "user %#v updated, \"status\" setted to 1", user.Username)
  439. }
  440. return updateBoltDatabaseVersion(dbHandle, 2)
  441. }
  442. func updateDatabaseFrom2To3(dbHandle *bolt.DB) error {
  443. providerLog(logger.LevelInfo, "updating bolt database version: 2 -> 3")
  444. users := []User{}
  445. err := dbHandle.View(func(tx *bolt.Tx) error {
  446. bucket, _, err := getBuckets(tx)
  447. if err != nil {
  448. return err
  449. }
  450. cursor := bucket.Cursor()
  451. for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
  452. var compatUser compatUserV2
  453. err = json.Unmarshal(v, &compatUser)
  454. if err == nil {
  455. user := User{}
  456. user.ID = compatUser.ID
  457. user.Username = compatUser.Username
  458. user.Password = compatUser.Password
  459. user.PublicKeys = compatUser.PublicKeys
  460. user.HomeDir = compatUser.HomeDir
  461. user.UID = compatUser.UID
  462. user.GID = compatUser.GID
  463. user.MaxSessions = compatUser.MaxSessions
  464. user.QuotaSize = compatUser.QuotaSize
  465. user.QuotaFiles = compatUser.QuotaFiles
  466. user.Permissions = make(map[string][]string)
  467. user.Permissions["/"] = compatUser.Permissions
  468. user.UsedQuotaSize = compatUser.UsedQuotaSize
  469. user.UsedQuotaFiles = compatUser.UsedQuotaFiles
  470. user.LastQuotaUpdate = compatUser.LastQuotaUpdate
  471. user.UploadBandwidth = compatUser.UploadBandwidth
  472. user.DownloadBandwidth = compatUser.DownloadBandwidth
  473. user.ExpirationDate = compatUser.ExpirationDate
  474. user.LastLogin = compatUser.LastLogin
  475. user.Status = compatUser.Status
  476. users = append(users, user)
  477. }
  478. }
  479. return err
  480. })
  481. if err != nil {
  482. return err
  483. }
  484. for _, user := range users {
  485. err = provider.updateUser(user)
  486. if err != nil {
  487. return err
  488. }
  489. providerLog(logger.LevelInfo, "user %#v updated, \"permissions\" setted to %+v", user.Username, user.Permissions)
  490. }
  491. return updateBoltDatabaseVersion(dbHandle, 3)
  492. }
  493. func getBoltAvailableUsernames(dbHandle *bolt.DB) ([]string, error) {
  494. usernames := []string{}
  495. err := dbHandle.View(func(tx *bolt.Tx) error {
  496. _, idxBucket, err := getBuckets(tx)
  497. if err != nil {
  498. return err
  499. }
  500. cursor := idxBucket.Cursor()
  501. for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
  502. usernames = append(usernames, string(v))
  503. }
  504. return nil
  505. })
  506. return usernames, err
  507. }
  508. func getBoltDatabaseVersion(dbHandle *bolt.DB) (schemaVersion, error) {
  509. var dbVersion schemaVersion
  510. err := dbHandle.View(func(tx *bolt.Tx) error {
  511. bucket := tx.Bucket(dbVersionBucket)
  512. if bucket == nil {
  513. return fmt.Errorf("unable to find database version bucket")
  514. }
  515. v := bucket.Get(dbVersionKey)
  516. if v == nil {
  517. dbVersion = schemaVersion{
  518. Version: 1,
  519. }
  520. return nil
  521. }
  522. return json.Unmarshal(v, &dbVersion)
  523. })
  524. return dbVersion, err
  525. }
  526. func updateBoltDatabaseVersion(dbHandle *bolt.DB, version int) error {
  527. err := dbHandle.Update(func(tx *bolt.Tx) error {
  528. bucket := tx.Bucket(dbVersionBucket)
  529. if bucket == nil {
  530. return fmt.Errorf("unable to find database version bucket")
  531. }
  532. newDbVersion := schemaVersion{
  533. Version: version,
  534. }
  535. buf, err := json.Marshal(newDbVersion)
  536. if err != nil {
  537. return err
  538. }
  539. return bucket.Put(dbVersionKey, buf)
  540. })
  541. return err
  542. }