gcsfs.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. package vfs
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "os"
  9. "path"
  10. "strings"
  11. "time"
  12. "cloud.google.com/go/storage"
  13. "github.com/eikenb/pipeat"
  14. "google.golang.org/api/googleapi"
  15. "google.golang.org/api/iterator"
  16. "google.golang.org/api/option"
  17. "github.com/drakkan/sftpgo/logger"
  18. "github.com/drakkan/sftpgo/metrics"
  19. )
  20. var (
  21. // we can use fields selection only when we don't need directory-like results
  22. // with folders
  23. gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated"}
  24. )
  25. // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
  26. type GCSFsConfig struct {
  27. Bucket string `json:"bucket,omitempty"`
  28. // KeyPrefix is similar to a chroot directory for local filesystem.
  29. // If specified the SFTP user will only see objects that starts with
  30. // this prefix and so you can restrict access to a specific virtual
  31. // folder. The prefix, if not empty, must not start with "/" and must
  32. // end with "/".
  33. // If empty the whole bucket contents will be available
  34. KeyPrefix string `json:"key_prefix,omitempty"`
  35. CredentialFile string `json:"-"`
  36. Credentials string `json:"credentials,omitempty"`
  37. AutomaticCredentials int `json:"automatic_credentials,omitempty"`
  38. StorageClass string `json:"storage_class,omitempty"`
  39. }
  40. // GCSFs is a Fs implementation for Google Cloud Storage.
  41. type GCSFs struct {
  42. connectionID string
  43. localTempDir string
  44. config GCSFsConfig
  45. svc *storage.Client
  46. ctxTimeout time.Duration
  47. ctxLongTimeout time.Duration
  48. }
  49. // NewGCSFs returns an GCSFs object that allows to interact with Google Cloud Storage
  50. func NewGCSFs(connectionID, localTempDir string, config GCSFsConfig) (Fs, error) {
  51. var err error
  52. fs := GCSFs{
  53. connectionID: connectionID,
  54. localTempDir: localTempDir,
  55. config: config,
  56. ctxTimeout: 30 * time.Second,
  57. ctxLongTimeout: 300 * time.Second,
  58. }
  59. if err = ValidateGCSFsConfig(&fs.config, fs.config.CredentialFile); err != nil {
  60. return fs, err
  61. }
  62. ctx := context.Background()
  63. if fs.config.AutomaticCredentials > 0 {
  64. fs.svc, err = storage.NewClient(ctx)
  65. } else {
  66. fs.svc, err = storage.NewClient(ctx, option.WithCredentialsFile(fs.config.CredentialFile))
  67. }
  68. return fs, err
  69. }
  70. // Name returns the name for the Fs implementation
  71. func (fs GCSFs) Name() string {
  72. return fmt.Sprintf("GCSFs bucket: %#v", fs.config.Bucket)
  73. }
  74. // ConnectionID returns the SSH connection ID associated to this Fs implementation
  75. func (fs GCSFs) ConnectionID() string {
  76. return fs.connectionID
  77. }
  78. // Stat returns a FileInfo describing the named file
  79. func (fs GCSFs) Stat(name string) (os.FileInfo, error) {
  80. var result FileInfo
  81. var err error
  82. if len(name) == 0 || name == "." {
  83. err := fs.checkIfBucketExists()
  84. if err != nil {
  85. return result, err
  86. }
  87. return NewFileInfo(name, true, 0, time.Time{}), nil
  88. }
  89. if fs.config.KeyPrefix == name+"/" {
  90. return NewFileInfo(name, true, 0, time.Time{}), nil
  91. }
  92. prefix := fs.getPrefixForStat(name)
  93. query := &storage.Query{Prefix: prefix, Delimiter: "/"}
  94. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  95. defer cancelFn()
  96. bkt := fs.svc.Bucket(fs.config.Bucket)
  97. it := bkt.Objects(ctx, query)
  98. for {
  99. attrs, err := it.Next()
  100. if err == iterator.Done {
  101. break
  102. }
  103. if err != nil {
  104. metrics.GCSListObjectsCompleted(err)
  105. return result, err
  106. }
  107. if len(attrs.Prefix) > 0 {
  108. if fs.isEqual(attrs.Prefix, name) {
  109. result = NewFileInfo(name, true, 0, time.Time{})
  110. }
  111. } else {
  112. if !attrs.Deleted.IsZero() {
  113. continue
  114. }
  115. if fs.isEqual(attrs.Name, name) {
  116. isDir := strings.HasSuffix(attrs.Name, "/")
  117. result = NewFileInfo(name, isDir, attrs.Size, attrs.Updated)
  118. }
  119. }
  120. }
  121. metrics.GCSListObjectsCompleted(nil)
  122. if len(result.Name()) == 0 {
  123. err = errors.New("404 no such file or directory")
  124. }
  125. return result, err
  126. }
  127. // Lstat returns a FileInfo describing the named file
  128. func (fs GCSFs) Lstat(name string) (os.FileInfo, error) {
  129. return fs.Stat(name)
  130. }
  131. // Open opens the named file for reading
  132. func (fs GCSFs) Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error) {
  133. r, w, err := pipeat.AsyncWriterPipeInDir(fs.localTempDir)
  134. if err != nil {
  135. return nil, nil, nil, err
  136. }
  137. bkt := fs.svc.Bucket(fs.config.Bucket)
  138. obj := bkt.Object(name)
  139. ctx, cancelFn := context.WithCancel(context.Background())
  140. objectReader, err := obj.NewReader(ctx)
  141. if err != nil {
  142. r.Close()
  143. w.Close()
  144. cancelFn()
  145. return nil, nil, nil, err
  146. }
  147. go func() {
  148. defer cancelFn()
  149. defer objectReader.Close()
  150. n, err := io.Copy(w, objectReader)
  151. w.CloseWithError(err) //nolint:errcheck // the returned error is always null
  152. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  153. metrics.GCSTransferCompleted(n, 1, err)
  154. }()
  155. return nil, r, cancelFn, nil
  156. }
  157. // Create creates or opens the named file for writing
  158. func (fs GCSFs) Create(name string, flag int) (*os.File, *pipeat.PipeWriterAt, func(), error) {
  159. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  160. if err != nil {
  161. return nil, nil, nil, err
  162. }
  163. bkt := fs.svc.Bucket(fs.config.Bucket)
  164. obj := bkt.Object(name)
  165. ctx, cancelFn := context.WithCancel(context.Background())
  166. objectWriter := obj.NewWriter(ctx)
  167. if len(fs.config.StorageClass) > 0 {
  168. objectWriter.ObjectAttrs.StorageClass = fs.config.StorageClass
  169. }
  170. go func() {
  171. defer cancelFn()
  172. defer objectWriter.Close()
  173. n, err := io.Copy(objectWriter, r)
  174. r.CloseWithError(err) //nolint:errcheck // the returned error is always null
  175. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %v", name, n, err)
  176. metrics.GCSTransferCompleted(n, 0, err)
  177. }()
  178. return nil, w, cancelFn, nil
  179. }
  180. // Rename renames (moves) source to target.
  181. // We don't support renaming non empty directories since we should
  182. // rename all the contents too and this could take long time: think
  183. // about directories with thousands of files, for each file we should
  184. // execute a CopyObject call.
  185. func (fs GCSFs) Rename(source, target string) error {
  186. if source == target {
  187. return nil
  188. }
  189. fi, err := fs.Stat(source)
  190. if err != nil {
  191. return err
  192. }
  193. if fi.IsDir() {
  194. contents, err := fs.ReadDir(source)
  195. if err != nil {
  196. return err
  197. }
  198. if len(contents) > 0 {
  199. return fmt.Errorf("Cannot rename non empty directory: %#v", source)
  200. }
  201. if !strings.HasSuffix(source, "/") {
  202. source += "/"
  203. }
  204. if !strings.HasSuffix(target, "/") {
  205. target += "/"
  206. }
  207. }
  208. src := fs.svc.Bucket(fs.config.Bucket).Object(source)
  209. dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
  210. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  211. defer cancelFn()
  212. copier := dst.CopierFrom(src)
  213. if len(fs.config.StorageClass) > 0 {
  214. copier.StorageClass = fs.config.StorageClass
  215. }
  216. _, err = copier.Run(ctx)
  217. metrics.GCSCopyObjectCompleted(err)
  218. if err != nil {
  219. return err
  220. }
  221. return fs.Remove(source, fi.IsDir())
  222. }
  223. // Remove removes the named file or (empty) directory.
  224. func (fs GCSFs) Remove(name string, isDir bool) error {
  225. if isDir {
  226. contents, err := fs.ReadDir(name)
  227. if err != nil {
  228. return err
  229. }
  230. if len(contents) > 0 {
  231. return fmt.Errorf("Cannot remove non empty directory: %#v", name)
  232. }
  233. if !strings.HasSuffix(name, "/") {
  234. name += "/"
  235. }
  236. }
  237. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  238. defer cancelFn()
  239. err := fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx)
  240. metrics.GCSDeleteObjectCompleted(err)
  241. return err
  242. }
  243. // Mkdir creates a new directory with the specified name and default permissions
  244. func (fs GCSFs) Mkdir(name string) error {
  245. _, err := fs.Stat(name)
  246. if !fs.IsNotExist(err) {
  247. return err
  248. }
  249. if !strings.HasSuffix(name, "/") {
  250. name += "/"
  251. }
  252. _, w, _, err := fs.Create(name, 0)
  253. if err != nil {
  254. return err
  255. }
  256. return w.Close()
  257. }
  258. // Symlink creates source as a symbolic link to target.
  259. func (GCSFs) Symlink(source, target string) error {
  260. return errors.New("403 symlinks are not supported")
  261. }
  262. // Chown changes the numeric uid and gid of the named file.
  263. // Silently ignored.
  264. func (GCSFs) Chown(name string, uid int, gid int) error {
  265. return nil
  266. }
  267. // Chmod changes the mode of the named file to mode.
  268. // Silently ignored.
  269. func (GCSFs) Chmod(name string, mode os.FileMode) error {
  270. return nil
  271. }
  272. // Chtimes changes the access and modification times of the named file.
  273. // Silently ignored.
  274. func (GCSFs) Chtimes(name string, atime, mtime time.Time) error {
  275. return errors.New("403 chtimes is not supported")
  276. }
  277. // ReadDir reads the directory named by dirname and returns
  278. // a list of directory entries.
  279. func (fs GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) {
  280. var result []os.FileInfo
  281. // dirname must be already cleaned
  282. prefix := ""
  283. if len(dirname) > 0 && dirname != "." {
  284. prefix = strings.TrimPrefix(dirname, "/")
  285. if !strings.HasSuffix(prefix, "/") {
  286. prefix += "/"
  287. }
  288. }
  289. query := &storage.Query{Prefix: prefix, Delimiter: "/"}
  290. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  291. defer cancelFn()
  292. bkt := fs.svc.Bucket(fs.config.Bucket)
  293. it := bkt.Objects(ctx, query)
  294. for {
  295. attrs, err := it.Next()
  296. if err == iterator.Done {
  297. break
  298. }
  299. if err != nil {
  300. metrics.GCSListObjectsCompleted(err)
  301. return result, err
  302. }
  303. if len(attrs.Prefix) > 0 {
  304. name, _ := fs.resolve(attrs.Prefix, prefix)
  305. result = append(result, NewFileInfo(name, true, 0, time.Time{}))
  306. } else {
  307. name, isDir := fs.resolve(attrs.Name, prefix)
  308. if len(name) == 0 {
  309. continue
  310. }
  311. if !attrs.Deleted.IsZero() {
  312. continue
  313. }
  314. result = append(result, NewFileInfo(name, isDir, attrs.Size, attrs.Updated))
  315. }
  316. }
  317. metrics.GCSListObjectsCompleted(nil)
  318. return result, nil
  319. }
  320. // IsUploadResumeSupported returns true if upload resume is supported.
  321. // SFTP Resume is not supported on S3
  322. func (GCSFs) IsUploadResumeSupported() bool {
  323. return false
  324. }
  325. // IsAtomicUploadSupported returns true if atomic upload is supported.
  326. // S3 uploads are already atomic, we don't need to upload to a temporary
  327. // file
  328. func (GCSFs) IsAtomicUploadSupported() bool {
  329. return false
  330. }
  331. // IsNotExist returns a boolean indicating whether the error is known to
  332. // report that a file or directory does not exist
  333. func (GCSFs) IsNotExist(err error) bool {
  334. if err == nil {
  335. return false
  336. }
  337. if err == storage.ErrObjectNotExist || err == storage.ErrBucketNotExist {
  338. return true
  339. }
  340. if e, ok := err.(*googleapi.Error); ok {
  341. if e.Code == http.StatusNotFound {
  342. return true
  343. }
  344. }
  345. return strings.Contains(err.Error(), "404")
  346. }
  347. // IsPermission returns a boolean indicating whether the error is known to
  348. // report that permission is denied.
  349. func (GCSFs) IsPermission(err error) bool {
  350. if err == nil {
  351. return false
  352. }
  353. if e, ok := err.(*googleapi.Error); ok {
  354. if e.Code == http.StatusForbidden || e.Code == http.StatusUnauthorized {
  355. return true
  356. }
  357. }
  358. return strings.Contains(err.Error(), "403")
  359. }
  360. // CheckRootPath creates the specified root directory if it does not exists
  361. func (fs GCSFs) CheckRootPath(username string, uid int, gid int) bool {
  362. // we need a local directory for temporary files
  363. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
  364. osFs.CheckRootPath(username, uid, gid)
  365. return fs.checkIfBucketExists() != nil
  366. }
  367. // ScanRootDirContents returns the number of files contained in the bucket,
  368. // and their size
  369. func (fs GCSFs) ScanRootDirContents() (int, int64, error) {
  370. numFiles := 0
  371. size := int64(0)
  372. query := &storage.Query{Prefix: fs.config.KeyPrefix}
  373. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  374. if err != nil {
  375. return numFiles, size, err
  376. }
  377. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  378. defer cancelFn()
  379. bkt := fs.svc.Bucket(fs.config.Bucket)
  380. it := bkt.Objects(ctx, query)
  381. for {
  382. attrs, err := it.Next()
  383. if err == iterator.Done {
  384. break
  385. }
  386. if err != nil {
  387. metrics.GCSListObjectsCompleted(err)
  388. return numFiles, size, err
  389. }
  390. if !attrs.Deleted.IsZero() {
  391. continue
  392. }
  393. numFiles++
  394. size += attrs.Size
  395. }
  396. metrics.GCSListObjectsCompleted(nil)
  397. return numFiles, size, err
  398. }
  399. // GetAtomicUploadPath returns the path to use for an atomic upload.
  400. // S3 uploads are already atomic, we never call this method for S3
  401. func (GCSFs) GetAtomicUploadPath(name string) string {
  402. return ""
  403. }
  404. // GetRelativePath returns the path for a file relative to the user's home dir.
  405. // This is the path as seen by SFTP users
  406. func (fs GCSFs) GetRelativePath(name string) string {
  407. rel := path.Clean(name)
  408. if rel == "." {
  409. rel = ""
  410. }
  411. if !path.IsAbs(rel) {
  412. rel = "/" + rel
  413. }
  414. if len(fs.config.KeyPrefix) > 0 {
  415. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  416. rel = "/"
  417. }
  418. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  419. }
  420. return rel
  421. }
  422. // Join joins any number of path elements into a single path
  423. func (GCSFs) Join(elem ...string) string {
  424. return strings.TrimPrefix(path.Join(elem...), "/")
  425. }
  426. // ResolvePath returns the matching filesystem path for the specified sftp path
  427. func (fs GCSFs) ResolvePath(sftpPath string) (string, error) {
  428. if !path.IsAbs(sftpPath) {
  429. sftpPath = path.Clean("/" + sftpPath)
  430. }
  431. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(sftpPath, "/")), nil
  432. }
  433. func (fs *GCSFs) resolve(name string, prefix string) (string, bool) {
  434. result := strings.TrimPrefix(name, prefix)
  435. isDir := strings.HasSuffix(result, "/")
  436. if isDir {
  437. result = strings.TrimSuffix(result, "/")
  438. }
  439. return result, isDir
  440. }
  441. func (fs *GCSFs) isEqual(key string, sftpName string) bool {
  442. if key == sftpName {
  443. return true
  444. }
  445. if key == sftpName+"/" {
  446. return true
  447. }
  448. if key+"/" == sftpName {
  449. return true
  450. }
  451. return false
  452. }
  453. func (fs *GCSFs) checkIfBucketExists() error {
  454. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  455. defer cancelFn()
  456. bkt := fs.svc.Bucket(fs.config.Bucket)
  457. _, err := bkt.Attrs(ctx)
  458. metrics.GCSHeadBucketCompleted(err)
  459. return err
  460. }
  461. func (fs *GCSFs) getPrefixForStat(name string) string {
  462. prefix := path.Dir(name)
  463. if prefix == "/" || prefix == "." || len(prefix) == 0 {
  464. prefix = ""
  465. } else {
  466. prefix = strings.TrimPrefix(prefix, "/")
  467. if !strings.HasSuffix(prefix, "/") {
  468. prefix += "/"
  469. }
  470. }
  471. return prefix
  472. }