gcsfs.go 18 KB


  1. // +build !nogcs
  2. package vfs
  3. import (
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "mime"
  10. "net/http"
  11. "os"
  12. "path"
  13. "path/filepath"
  14. "strings"
  15. "time"
  16. "cloud.google.com/go/storage"
  17. "github.com/eikenb/pipeat"
  18. "github.com/pkg/sftp"
  19. "google.golang.org/api/googleapi"
  20. "google.golang.org/api/iterator"
  21. "google.golang.org/api/option"
  22. "github.com/drakkan/sftpgo/kms"
  23. "github.com/drakkan/sftpgo/logger"
  24. "github.com/drakkan/sftpgo/metrics"
  25. "github.com/drakkan/sftpgo/version"
  26. )
  27. var (
  28. gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated", "ContentType"}
  29. )
  30. // GCSFs is a Fs implementation for Google Cloud Storage.
  31. type GCSFs struct {
  32. connectionID string
  33. localTempDir string
  34. config *GCSFsConfig
  35. svc *storage.Client
  36. ctxTimeout time.Duration
  37. ctxLongTimeout time.Duration
  38. }
  39. func init() {
  40. version.AddFeature("+gcs")
  41. }
  42. // NewGCSFs returns an GCSFs object that allows to interact with Google Cloud Storage
  43. func NewGCSFs(connectionID, localTempDir string, config GCSFsConfig) (Fs, error) {
  44. var err error
  45. fs := &GCSFs{
  46. connectionID: connectionID,
  47. localTempDir: localTempDir,
  48. config: &config,
  49. ctxTimeout: 30 * time.Second,
  50. ctxLongTimeout: 300 * time.Second,
  51. }
  52. if err = fs.config.Validate(fs.config.CredentialFile); err != nil {
  53. return fs, err
  54. }
  55. ctx := context.Background()
  56. if fs.config.AutomaticCredentials > 0 {
  57. fs.svc, err = storage.NewClient(ctx)
  58. } else if !fs.config.Credentials.IsEmpty() {
  59. if fs.config.Credentials.IsEncrypted() {
  60. err = fs.config.Credentials.Decrypt()
  61. if err != nil {
  62. return fs, err
  63. }
  64. }
  65. fs.svc, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(fs.config.Credentials.GetPayload())))
  66. } else {
  67. var creds []byte
  68. creds, err = os.ReadFile(fs.config.CredentialFile)
  69. if err != nil {
  70. return fs, err
  71. }
  72. secret := kms.NewEmptySecret()
  73. err = json.Unmarshal(creds, secret)
  74. if err != nil {
  75. return fs, err
  76. }
  77. err = secret.Decrypt()
  78. if err != nil {
  79. return fs, err
  80. }
  81. fs.svc, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(secret.GetPayload())))
  82. }
  83. return fs, err
  84. }
  85. // Name returns the name for the Fs implementation
  86. func (fs *GCSFs) Name() string {
  87. return fmt.Sprintf("GCSFs bucket %#v", fs.config.Bucket)
  88. }
  89. // ConnectionID returns the connection ID associated to this Fs implementation
  90. func (fs *GCSFs) ConnectionID() string {
  91. return fs.connectionID
  92. }
  93. // Stat returns a FileInfo describing the named file
  94. func (fs *GCSFs) Stat(name string) (os.FileInfo, error) {
  95. var result *FileInfo
  96. var err error
  97. if name == "" || name == "." {
  98. err := fs.checkIfBucketExists()
  99. if err != nil {
  100. return result, err
  101. }
  102. return NewFileInfo(name, true, 0, time.Now(), false), nil
  103. }
  104. if fs.config.KeyPrefix == name+"/" {
  105. return NewFileInfo(name, true, 0, time.Now(), false), nil
  106. }
  107. attrs, err := fs.headObject(name)
  108. if err == nil {
  109. objSize := attrs.Size
  110. objectModTime := attrs.Updated
  111. isDir := attrs.ContentType == dirMimeType || strings.HasSuffix(attrs.Name, "/")
  112. return NewFileInfo(name, isDir, objSize, objectModTime, false), nil
  113. }
  114. if !fs.IsNotExist(err) {
  115. return result, err
  116. }
  117. // now check if this is a prefix (virtual directory)
  118. hasContents, err := fs.hasContents(name)
  119. if err != nil {
  120. return nil, err
  121. }
  122. if hasContents {
  123. return NewFileInfo(name, true, 0, time.Now(), false), nil
  124. }
  125. return nil, errors.New("404 no such file or directory")
  126. }
  127. // Lstat returns a FileInfo describing the named file
  128. func (fs *GCSFs) Lstat(name string) (os.FileInfo, error) {
  129. return fs.Stat(name)
  130. }
  131. // Open opens the named file for reading
  132. func (fs *GCSFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  133. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  134. if err != nil {
  135. return nil, nil, nil, err
  136. }
  137. bkt := fs.svc.Bucket(fs.config.Bucket)
  138. obj := bkt.Object(name)
  139. ctx, cancelFn := context.WithCancel(context.Background())
  140. objectReader, err := obj.NewRangeReader(ctx, offset, -1)
  141. if err == nil && offset > 0 && objectReader.Attrs.ContentEncoding == "gzip" {
  142. err = fmt.Errorf("Range request is not possible for gzip content encoding, requested offset %v", offset)
  143. objectReader.Close()
  144. }
  145. if err != nil {
  146. r.Close()
  147. w.Close()
  148. cancelFn()
  149. return nil, nil, nil, err
  150. }
  151. go func() {
  152. defer cancelFn()
  153. defer objectReader.Close()
  154. n, err := io.Copy(w, objectReader)
  155. w.CloseWithError(err) //nolint:errcheck
  156. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  157. metrics.GCSTransferCompleted(n, 1, err)
  158. }()
  159. return nil, r, cancelFn, nil
  160. }
  161. // Create creates or opens the named file for writing
  162. func (fs *GCSFs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  163. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  164. if err != nil {
  165. return nil, nil, nil, err
  166. }
  167. p := NewPipeWriter(w)
  168. bkt := fs.svc.Bucket(fs.config.Bucket)
  169. obj := bkt.Object(name)
  170. ctx, cancelFn := context.WithCancel(context.Background())
  171. objectWriter := obj.NewWriter(ctx)
  172. var contentType string
  173. if flag == -1 {
  174. contentType = dirMimeType
  175. } else {
  176. contentType = mime.TypeByExtension(path.Ext(name))
  177. }
  178. if contentType != "" {
  179. objectWriter.ObjectAttrs.ContentType = contentType
  180. }
  181. if fs.config.StorageClass != "" {
  182. objectWriter.ObjectAttrs.StorageClass = fs.config.StorageClass
  183. }
  184. go func() {
  185. defer cancelFn()
  186. n, err := io.Copy(objectWriter, r)
  187. closeErr := objectWriter.Close()
  188. if err == nil {
  189. err = closeErr
  190. }
  191. r.CloseWithError(err) //nolint:errcheck
  192. p.Done(err)
  193. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %v", name, n, err)
  194. metrics.GCSTransferCompleted(n, 0, err)
  195. }()
  196. return nil, p, cancelFn, nil
  197. }
  198. // Rename renames (moves) source to target.
  199. // We don't support renaming non empty directories since we should
  200. // rename all the contents too and this could take long time: think
  201. // about directories with thousands of files, for each file we should
  202. // execute a CopyObject call.
  203. func (fs *GCSFs) Rename(source, target string) error {
  204. if source == target {
  205. return nil
  206. }
  207. fi, err := fs.Stat(source)
  208. if err != nil {
  209. return err
  210. }
  211. if fi.IsDir() {
  212. hasContents, err := fs.hasContents(source)
  213. if err != nil {
  214. return err
  215. }
  216. if hasContents {
  217. return fmt.Errorf("Cannot rename non empty directory: %#v", source)
  218. }
  219. }
  220. src := fs.svc.Bucket(fs.config.Bucket).Object(source)
  221. dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
  222. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  223. defer cancelFn()
  224. copier := dst.CopierFrom(src)
  225. if fs.config.StorageClass != "" {
  226. copier.StorageClass = fs.config.StorageClass
  227. }
  228. var contentType string
  229. if fi.IsDir() {
  230. contentType = dirMimeType
  231. } else {
  232. contentType = mime.TypeByExtension(path.Ext(source))
  233. }
  234. if contentType != "" {
  235. copier.ContentType = contentType
  236. }
  237. _, err = copier.Run(ctx)
  238. metrics.GCSCopyObjectCompleted(err)
  239. if err != nil {
  240. return err
  241. }
  242. return fs.Remove(source, fi.IsDir())
  243. }
  244. // Remove removes the named file or (empty) directory.
  245. func (fs *GCSFs) Remove(name string, isDir bool) error {
  246. if isDir {
  247. hasContents, err := fs.hasContents(name)
  248. if err != nil {
  249. return err
  250. }
  251. if hasContents {
  252. return fmt.Errorf("Cannot remove non empty directory: %#v", name)
  253. }
  254. }
  255. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  256. defer cancelFn()
  257. err := fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx)
  258. metrics.GCSDeleteObjectCompleted(err)
  259. return err
  260. }
  261. // Mkdir creates a new directory with the specified name and default permissions
  262. func (fs *GCSFs) Mkdir(name string) error {
  263. _, err := fs.Stat(name)
  264. if !fs.IsNotExist(err) {
  265. return err
  266. }
  267. _, w, _, err := fs.Create(name, -1)
  268. if err != nil {
  269. return err
  270. }
  271. return w.Close()
  272. }
  273. // Symlink creates source as a symbolic link to target.
  274. func (*GCSFs) Symlink(source, target string) error {
  275. return ErrVfsUnsupported
  276. }
  277. // Readlink returns the destination of the named symbolic link
  278. func (*GCSFs) Readlink(name string) (string, error) {
  279. return "", ErrVfsUnsupported
  280. }
  281. // Chown changes the numeric uid and gid of the named file.
  282. func (*GCSFs) Chown(name string, uid int, gid int) error {
  283. return ErrVfsUnsupported
  284. }
  285. // Chmod changes the mode of the named file to mode.
  286. func (*GCSFs) Chmod(name string, mode os.FileMode) error {
  287. return ErrVfsUnsupported
  288. }
  289. // Chtimes changes the access and modification times of the named file.
  290. func (*GCSFs) Chtimes(name string, atime, mtime time.Time) error {
  291. return ErrVfsUnsupported
  292. }
  293. // Truncate changes the size of the named file.
  294. // Truncate by path is not supported, while truncating an opened
  295. // file is handled inside base transfer
  296. func (*GCSFs) Truncate(name string, size int64) error {
  297. return ErrVfsUnsupported
  298. }
  299. // ReadDir reads the directory named by dirname and returns
  300. // a list of directory entries.
  301. func (fs *GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) {
  302. var result []os.FileInfo
  303. // dirname must be already cleaned
  304. prefix := fs.getPrefix(dirname)
  305. query := &storage.Query{Prefix: prefix, Delimiter: "/"}
  306. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  307. if err != nil {
  308. return nil, err
  309. }
  310. prefixes := make(map[string]bool)
  311. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  312. defer cancelFn()
  313. bkt := fs.svc.Bucket(fs.config.Bucket)
  314. it := bkt.Objects(ctx, query)
  315. for {
  316. attrs, err := it.Next()
  317. if err == iterator.Done {
  318. break
  319. }
  320. if err != nil {
  321. metrics.GCSListObjectsCompleted(err)
  322. return result, err
  323. }
  324. if attrs.Prefix != "" {
  325. name, _ := fs.resolve(attrs.Prefix, prefix)
  326. if name == "" {
  327. continue
  328. }
  329. if _, ok := prefixes[name]; ok {
  330. continue
  331. }
  332. result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
  333. prefixes[name] = true
  334. } else {
  335. name, isDir := fs.resolve(attrs.Name, prefix)
  336. if name == "" {
  337. continue
  338. }
  339. if !attrs.Deleted.IsZero() {
  340. continue
  341. }
  342. if attrs.ContentType == dirMimeType {
  343. isDir = true
  344. }
  345. if isDir {
  346. // check if the dir is already included, it will be sent as blob prefix if it contains at least one item
  347. if _, ok := prefixes[name]; ok {
  348. continue
  349. }
  350. prefixes[name] = true
  351. }
  352. fi := NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false)
  353. result = append(result, fi)
  354. }
  355. }
  356. metrics.GCSListObjectsCompleted(nil)
  357. return result, nil
  358. }
  359. // IsUploadResumeSupported returns true if upload resume is supported.
  360. // SFTP Resume is not supported on S3
  361. func (*GCSFs) IsUploadResumeSupported() bool {
  362. return false
  363. }
  364. // IsAtomicUploadSupported returns true if atomic upload is supported.
  365. // S3 uploads are already atomic, we don't need to upload to a temporary
  366. // file
  367. func (*GCSFs) IsAtomicUploadSupported() bool {
  368. return false
  369. }
  370. // IsNotExist returns a boolean indicating whether the error is known to
  371. // report that a file or directory does not exist
  372. func (*GCSFs) IsNotExist(err error) bool {
  373. if err == nil {
  374. return false
  375. }
  376. if err == storage.ErrObjectNotExist || err == storage.ErrBucketNotExist {
  377. return true
  378. }
  379. if e, ok := err.(*googleapi.Error); ok {
  380. if e.Code == http.StatusNotFound {
  381. return true
  382. }
  383. }
  384. return strings.Contains(err.Error(), "404")
  385. }
  386. // IsPermission returns a boolean indicating whether the error is known to
  387. // report that permission is denied.
  388. func (*GCSFs) IsPermission(err error) bool {
  389. if err == nil {
  390. return false
  391. }
  392. if e, ok := err.(*googleapi.Error); ok {
  393. if e.Code == http.StatusForbidden || e.Code == http.StatusUnauthorized {
  394. return true
  395. }
  396. }
  397. return strings.Contains(err.Error(), "403")
  398. }
  399. // IsNotSupported returns true if the error indicate an unsupported operation
  400. func (*GCSFs) IsNotSupported(err error) bool {
  401. if err == nil {
  402. return false
  403. }
  404. return err == ErrVfsUnsupported
  405. }
  406. // CheckRootPath creates the specified local root directory if it does not exists
  407. func (fs *GCSFs) CheckRootPath(username string, uid int, gid int) bool {
  408. // we need a local directory for temporary files
  409. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
  410. return osFs.CheckRootPath(username, uid, gid)
  411. }
  412. // ScanRootDirContents returns the number of files contained in the bucket,
  413. // and their size
  414. func (fs *GCSFs) ScanRootDirContents() (int, int64, error) {
  415. numFiles := 0
  416. size := int64(0)
  417. query := &storage.Query{Prefix: fs.config.KeyPrefix}
  418. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  419. if err != nil {
  420. return numFiles, size, err
  421. }
  422. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  423. defer cancelFn()
  424. bkt := fs.svc.Bucket(fs.config.Bucket)
  425. it := bkt.Objects(ctx, query)
  426. for {
  427. attrs, err := it.Next()
  428. if err == iterator.Done {
  429. break
  430. }
  431. if err != nil {
  432. metrics.GCSListObjectsCompleted(err)
  433. return numFiles, size, err
  434. }
  435. if !attrs.Deleted.IsZero() {
  436. continue
  437. }
  438. isDir := strings.HasSuffix(attrs.Name, "/") || attrs.ContentType == dirMimeType
  439. if isDir && attrs.Size == 0 {
  440. continue
  441. }
  442. numFiles++
  443. size += attrs.Size
  444. }
  445. metrics.GCSListObjectsCompleted(nil)
  446. return numFiles, size, err
  447. }
  448. // GetDirSize returns the number of files and the size for a folder
  449. // including any subfolders
  450. func (*GCSFs) GetDirSize(dirname string) (int, int64, error) {
  451. return 0, 0, ErrVfsUnsupported
  452. }
  453. // GetAtomicUploadPath returns the path to use for an atomic upload.
  454. // GCS uploads are already atomic, we never call this method for GCS
  455. func (*GCSFs) GetAtomicUploadPath(name string) string {
  456. return ""
  457. }
  458. // GetRelativePath returns the path for a file relative to the user's home dir.
  459. // This is the path as seen by SFTPGo users
  460. func (fs *GCSFs) GetRelativePath(name string) string {
  461. rel := path.Clean(name)
  462. if rel == "." {
  463. rel = ""
  464. }
  465. if !path.IsAbs(rel) {
  466. rel = "/" + rel
  467. }
  468. if fs.config.KeyPrefix != "" {
  469. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  470. rel = "/"
  471. }
  472. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  473. }
  474. return rel
  475. }
  476. // Walk walks the file tree rooted at root, calling walkFn for each file or
  477. // directory in the tree, including root
  478. func (fs *GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
  479. prefix := ""
  480. if root != "" && root != "." {
  481. prefix = strings.TrimPrefix(root, "/")
  482. if !strings.HasSuffix(prefix, "/") {
  483. prefix += "/"
  484. }
  485. }
  486. query := &storage.Query{Prefix: prefix}
  487. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  488. if err != nil {
  489. walkFn(root, nil, err) //nolint:errcheck
  490. return err
  491. }
  492. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  493. defer cancelFn()
  494. bkt := fs.svc.Bucket(fs.config.Bucket)
  495. it := bkt.Objects(ctx, query)
  496. for {
  497. attrs, err := it.Next()
  498. if err == iterator.Done {
  499. break
  500. }
  501. if err != nil {
  502. walkFn(root, nil, err) //nolint:errcheck
  503. metrics.GCSListObjectsCompleted(err)
  504. return err
  505. }
  506. if !attrs.Deleted.IsZero() {
  507. continue
  508. }
  509. name, isDir := fs.resolve(attrs.Name, prefix)
  510. if name == "" {
  511. continue
  512. }
  513. if attrs.ContentType == dirMimeType {
  514. isDir = true
  515. }
  516. err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false), nil)
  517. if err != nil {
  518. return err
  519. }
  520. }
  521. walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck
  522. metrics.GCSListObjectsCompleted(err)
  523. return err
  524. }
  525. // Join joins any number of path elements into a single path
  526. func (*GCSFs) Join(elem ...string) string {
  527. return strings.TrimPrefix(path.Join(elem...), "/")
  528. }
  529. // HasVirtualFolders returns true if folders are emulated
  530. func (GCSFs) HasVirtualFolders() bool {
  531. return true
  532. }
  533. // ResolvePath returns the matching filesystem path for the specified virtual path
  534. func (fs *GCSFs) ResolvePath(virtualPath string) (string, error) {
  535. if !path.IsAbs(virtualPath) {
  536. virtualPath = path.Clean("/" + virtualPath)
  537. }
  538. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  539. }
  540. func (fs *GCSFs) resolve(name string, prefix string) (string, bool) {
  541. result := strings.TrimPrefix(name, prefix)
  542. isDir := strings.HasSuffix(result, "/")
  543. if isDir {
  544. result = strings.TrimSuffix(result, "/")
  545. }
  546. return result, isDir
  547. }
  548. func (fs *GCSFs) checkIfBucketExists() error {
  549. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  550. defer cancelFn()
  551. bkt := fs.svc.Bucket(fs.config.Bucket)
  552. _, err := bkt.Attrs(ctx)
  553. metrics.GCSHeadBucketCompleted(err)
  554. return err
  555. }
  556. func (fs *GCSFs) hasContents(name string) (bool, error) {
  557. result := false
  558. prefix := ""
  559. if name != "" && name != "." {
  560. prefix = strings.TrimPrefix(name, "/")
  561. if !strings.HasSuffix(prefix, "/") {
  562. prefix += "/"
  563. }
  564. }
  565. query := &storage.Query{Prefix: prefix}
  566. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  567. if err != nil {
  568. return result, err
  569. }
  570. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  571. defer cancelFn()
  572. bkt := fs.svc.Bucket(fs.config.Bucket)
  573. it := bkt.Objects(ctx, query)
  574. // if we have a dir object with a trailing slash it will be returned so we set the size to 2
  575. it.PageInfo().MaxSize = 2
  576. for {
  577. attrs, err := it.Next()
  578. if err == iterator.Done {
  579. break
  580. }
  581. if err != nil {
  582. metrics.GCSListObjectsCompleted(err)
  583. return result, err
  584. }
  585. name, _ := fs.resolve(attrs.Name, prefix)
  586. // a dir object with a trailing slash will result in an empty name
  587. if name == "/" || name == "" {
  588. continue
  589. }
  590. result = true
  591. break
  592. }
  593. metrics.GCSListObjectsCompleted(err)
  594. return result, nil
  595. }
  596. func (fs *GCSFs) getPrefix(name string) string {
  597. prefix := ""
  598. if name != "" && name != "." && name != "/" {
  599. prefix = strings.TrimPrefix(name, "/")
  600. if !strings.HasSuffix(prefix, "/") {
  601. prefix += "/"
  602. }
  603. }
  604. return prefix
  605. }
  606. func (fs *GCSFs) headObject(name string) (*storage.ObjectAttrs, error) {
  607. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  608. defer cancelFn()
  609. bkt := fs.svc.Bucket(fs.config.Bucket)
  610. obj := bkt.Object(name)
  611. attrs, err := obj.Attrs(ctx)
  612. metrics.GCSHeadObjectCompleted(err)
  613. return attrs, err
  614. }
  615. // GetMimeType returns the content type
  616. func (fs *GCSFs) GetMimeType(name string) (string, error) {
  617. attrs, err := fs.headObject(name)
  618. if err != nil {
  619. return "", err
  620. }
  621. return attrs.ContentType, nil
  622. }
  623. // Close closes the fs
  624. func (fs *GCSFs) Close() error {
  625. return nil
  626. }
  627. // GetAvailableDiskSize return the available size for the specified path
  628. func (*GCSFs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  629. return nil, ErrStorageSizeUnavailable
  630. }