s3fs.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. // +build !nos3
  2. package vfs
  3. import (
  4. "context"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "path"
  9. "path/filepath"
  10. "strings"
  11. "time"
  12. "github.com/aws/aws-sdk-go/aws"
  13. "github.com/aws/aws-sdk-go/aws/awserr"
  14. "github.com/aws/aws-sdk-go/aws/credentials"
  15. "github.com/aws/aws-sdk-go/aws/session"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  18. "github.com/eikenb/pipeat"
  19. "github.com/drakkan/sftpgo/logger"
  20. "github.com/drakkan/sftpgo/metrics"
  21. "github.com/drakkan/sftpgo/utils"
  22. "github.com/drakkan/sftpgo/version"
  23. )
  24. // S3Fs is a Fs implementation for Amazon S3 compatible object storage.
  25. type S3Fs struct {
  26. connectionID string
  27. localTempDir string
  28. config S3FsConfig
  29. svc *s3.S3
  30. ctxTimeout time.Duration
  31. ctxLongTimeout time.Duration
  32. }
  33. func init() {
  34. version.AddFeature("+s3")
  35. }
  36. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  37. // object storage
  38. func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) {
  39. fs := S3Fs{
  40. connectionID: connectionID,
  41. localTempDir: localTempDir,
  42. config: config,
  43. ctxTimeout: 30 * time.Second,
  44. ctxLongTimeout: 300 * time.Second,
  45. }
  46. if err := ValidateS3FsConfig(&fs.config); err != nil {
  47. return fs, err
  48. }
  49. awsConfig := aws.NewConfig()
  50. if len(fs.config.Region) > 0 {
  51. awsConfig.WithRegion(fs.config.Region)
  52. }
  53. if len(fs.config.AccessSecret) > 0 {
  54. accessSecret, err := utils.DecryptData(fs.config.AccessSecret)
  55. if err != nil {
  56. return fs, err
  57. }
  58. fs.config.AccessSecret = accessSecret
  59. awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret, "")
  60. }
  61. if len(fs.config.Endpoint) > 0 {
  62. awsConfig.Endpoint = aws.String(fs.config.Endpoint)
  63. awsConfig.S3ForcePathStyle = aws.Bool(true)
  64. }
  65. if fs.config.UploadPartSize == 0 {
  66. fs.config.UploadPartSize = s3manager.DefaultUploadPartSize
  67. } else {
  68. fs.config.UploadPartSize *= 1024 * 1024
  69. }
  70. if fs.config.UploadConcurrency == 0 {
  71. fs.config.UploadConcurrency = 2
  72. }
  73. sessOpts := session.Options{
  74. Config: *awsConfig,
  75. SharedConfigState: session.SharedConfigEnable,
  76. }
  77. sess, err := session.NewSessionWithOptions(sessOpts)
  78. if err != nil {
  79. return fs, err
  80. }
  81. fs.svc = s3.New(sess)
  82. return fs, nil
  83. }
  84. // Name returns the name for the Fs implementation
  85. func (fs S3Fs) Name() string {
  86. return fmt.Sprintf("S3Fs bucket: %#v", fs.config.Bucket)
  87. }
  88. // ConnectionID returns the SSH connection ID associated to this Fs implementation
  89. func (fs S3Fs) ConnectionID() string {
  90. return fs.connectionID
  91. }
  92. // Stat returns a FileInfo describing the named file
  93. func (fs S3Fs) Stat(name string) (os.FileInfo, error) {
  94. var result FileInfo
  95. if name == "/" || name == "." {
  96. err := fs.checkIfBucketExists()
  97. if err != nil {
  98. return result, err
  99. }
  100. return NewFileInfo(name, true, 0, time.Time{}), nil
  101. }
  102. if "/"+fs.config.KeyPrefix == name+"/" {
  103. return NewFileInfo(name, true, 0, time.Time{}), nil
  104. }
  105. prefix := path.Dir(name)
  106. if prefix == "/" || prefix == "." {
  107. prefix = ""
  108. } else {
  109. prefix = strings.TrimPrefix(prefix, "/")
  110. if !strings.HasSuffix(prefix, "/") {
  111. prefix += "/"
  112. }
  113. }
  114. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  115. defer cancelFn()
  116. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  117. Bucket: aws.String(fs.config.Bucket),
  118. Prefix: aws.String(prefix),
  119. Delimiter: aws.String("/"),
  120. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  121. for _, p := range page.CommonPrefixes {
  122. if fs.isEqual(p.Prefix, name) {
  123. result = NewFileInfo(name, true, 0, time.Time{})
  124. return false
  125. }
  126. }
  127. for _, fileObject := range page.Contents {
  128. if fs.isEqual(fileObject.Key, name) {
  129. objectSize := *fileObject.Size
  130. objectModTime := *fileObject.LastModified
  131. isDir := strings.HasSuffix(*fileObject.Key, "/")
  132. result = NewFileInfo(name, isDir, objectSize, objectModTime)
  133. return false
  134. }
  135. }
  136. return true
  137. })
  138. metrics.S3ListObjectsCompleted(err)
  139. if err == nil && len(result.Name()) == 0 {
  140. err = errors.New("404 no such file or directory")
  141. }
  142. return result, err
  143. }
  144. // Lstat returns a FileInfo describing the named file
  145. func (fs S3Fs) Lstat(name string) (os.FileInfo, error) {
  146. return fs.Stat(name)
  147. }
  148. // Open opens the named file for reading
  149. func (fs S3Fs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
  150. r, w, err := pipeat.AsyncWriterPipeInDir(fs.localTempDir)
  151. if err != nil {
  152. return nil, nil, nil, err
  153. }
  154. ctx, cancelFn := context.WithCancel(context.Background())
  155. downloader := s3manager.NewDownloaderWithClient(fs.svc)
  156. var streamRange *string
  157. if offset > 0 {
  158. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  159. }
  160. go func() {
  161. defer cancelFn()
  162. n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
  163. Bucket: aws.String(fs.config.Bucket),
  164. Key: aws.String(name),
  165. Range: streamRange,
  166. })
  167. w.CloseWithError(err) //nolint:errcheck // the returned error is always null
  168. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  169. metrics.S3TransferCompleted(n, 1, err)
  170. }()
  171. return nil, r, cancelFn, nil
  172. }
  173. // Create creates or opens the named file for writing
  174. func (fs S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
  175. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  176. if err != nil {
  177. return nil, nil, nil, err
  178. }
  179. p := NewPipeWriter(w)
  180. ctx, cancelFn := context.WithCancel(context.Background())
  181. uploader := s3manager.NewUploaderWithClient(fs.svc)
  182. go func() {
  183. defer cancelFn()
  184. key := name
  185. response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
  186. Bucket: aws.String(fs.config.Bucket),
  187. Key: aws.String(key),
  188. Body: r,
  189. StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
  190. }, func(u *s3manager.Uploader) {
  191. u.Concurrency = fs.config.UploadConcurrency
  192. u.PartSize = fs.config.UploadPartSize
  193. })
  194. r.CloseWithError(err) //nolint:errcheck // the returned error is always null
  195. p.Done(err)
  196. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, response: %v, readed bytes: %v, err: %+v",
  197. name, response, r.GetReadedBytes(), err)
  198. metrics.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  199. }()
  200. return nil, p, cancelFn, nil
  201. }
  202. // Rename renames (moves) source to target.
  203. // We don't support renaming non empty directories since we should
  204. // rename all the contents too and this could take long time: think
  205. // about directories with thousands of files, for each file we should
  206. // execute a CopyObject call.
  207. // TODO: rename does not work for files bigger than 5GB, implement
  208. // multipart copy or wait for this pull request to be merged:
  209. //
  210. // https://github.com/aws/aws-sdk-go/pull/2653
  211. //
  212. func (fs S3Fs) Rename(source, target string) error {
  213. if source == target {
  214. return nil
  215. }
  216. fi, err := fs.Stat(source)
  217. if err != nil {
  218. return err
  219. }
  220. copySource := fs.Join(fs.config.Bucket, source)
  221. if fi.IsDir() {
  222. contents, err := fs.ReadDir(source)
  223. if err != nil {
  224. return err
  225. }
  226. if len(contents) > 0 {
  227. return fmt.Errorf("Cannot rename non empty directory: %#v", source)
  228. }
  229. if !strings.HasSuffix(copySource, "/") {
  230. copySource += "/"
  231. }
  232. if !strings.HasSuffix(target, "/") {
  233. target += "/"
  234. }
  235. }
  236. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  237. defer cancelFn()
  238. _, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
  239. Bucket: aws.String(fs.config.Bucket),
  240. CopySource: aws.String(copySource),
  241. Key: aws.String(target),
  242. })
  243. metrics.S3CopyObjectCompleted(err)
  244. if err != nil {
  245. return err
  246. }
  247. return fs.Remove(source, fi.IsDir())
  248. }
  249. // Remove removes the named file or (empty) directory.
  250. func (fs S3Fs) Remove(name string, isDir bool) error {
  251. if isDir {
  252. contents, err := fs.ReadDir(name)
  253. if err != nil {
  254. return err
  255. }
  256. if len(contents) > 0 {
  257. return fmt.Errorf("Cannot remove non empty directory: %#v", name)
  258. }
  259. if !strings.HasSuffix(name, "/") {
  260. name += "/"
  261. }
  262. }
  263. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  264. defer cancelFn()
  265. _, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
  266. Bucket: aws.String(fs.config.Bucket),
  267. Key: aws.String(name),
  268. })
  269. metrics.S3DeleteObjectCompleted(err)
  270. return err
  271. }
  272. // Mkdir creates a new directory with the specified name and default permissions
  273. func (fs S3Fs) Mkdir(name string) error {
  274. _, err := fs.Stat(name)
  275. if !fs.IsNotExist(err) {
  276. return err
  277. }
  278. if !strings.HasSuffix(name, "/") {
  279. name += "/"
  280. }
  281. _, w, _, err := fs.Create(name, 0)
  282. if err != nil {
  283. return err
  284. }
  285. return w.Close()
  286. }
  287. // Symlink creates source as a symbolic link to target.
  288. func (S3Fs) Symlink(source, target string) error {
  289. return errors.New("403 symlinks are not supported")
  290. }
  291. // Chown changes the numeric uid and gid of the named file.
  292. // Silently ignored.
  293. func (S3Fs) Chown(name string, uid int, gid int) error {
  294. return nil
  295. }
  296. // Chmod changes the mode of the named file to mode.
  297. // Silently ignored.
  298. func (S3Fs) Chmod(name string, mode os.FileMode) error {
  299. return nil
  300. }
  301. // Chtimes changes the access and modification times of the named file.
  302. // Silently ignored.
  303. func (S3Fs) Chtimes(name string, atime, mtime time.Time) error {
  304. return errors.New("403 chtimes is not supported")
  305. }
  306. // ReadDir reads the directory named by dirname and returns
  307. // a list of directory entries.
  308. func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  309. var result []os.FileInfo
  310. // dirname must be already cleaned
  311. prefix := ""
  312. if dirname != "/" && dirname != "." {
  313. prefix = strings.TrimPrefix(dirname, "/")
  314. if !strings.HasSuffix(prefix, "/") {
  315. prefix += "/"
  316. }
  317. }
  318. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  319. defer cancelFn()
  320. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  321. Bucket: aws.String(fs.config.Bucket),
  322. Prefix: aws.String(prefix),
  323. Delimiter: aws.String("/"),
  324. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  325. for _, p := range page.CommonPrefixes {
  326. name, isDir := fs.resolve(p.Prefix, prefix)
  327. result = append(result, NewFileInfo(name, isDir, 0, time.Time{}))
  328. }
  329. for _, fileObject := range page.Contents {
  330. objectSize := *fileObject.Size
  331. objectModTime := *fileObject.LastModified
  332. name, isDir := fs.resolve(fileObject.Key, prefix)
  333. if len(name) == 0 {
  334. continue
  335. }
  336. result = append(result, NewFileInfo(name, isDir, objectSize, objectModTime))
  337. }
  338. return true
  339. })
  340. metrics.S3ListObjectsCompleted(err)
  341. return result, err
  342. }
  343. // IsUploadResumeSupported returns true if upload resume is supported.
  344. // SFTP Resume is not supported on S3
  345. func (S3Fs) IsUploadResumeSupported() bool {
  346. return false
  347. }
  348. // IsAtomicUploadSupported returns true if atomic upload is supported.
  349. // S3 uploads are already atomic, we don't need to upload to a temporary
  350. // file
  351. func (S3Fs) IsAtomicUploadSupported() bool {
  352. return false
  353. }
  354. // IsNotExist returns a boolean indicating whether the error is known to
  355. // report that a file or directory does not exist
  356. func (S3Fs) IsNotExist(err error) bool {
  357. if err == nil {
  358. return false
  359. }
  360. if aerr, ok := err.(awserr.Error); ok {
  361. if aerr.Code() == s3.ErrCodeNoSuchKey {
  362. return true
  363. }
  364. if aerr.Code() == s3.ErrCodeNoSuchBucket {
  365. return true
  366. }
  367. }
  368. if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
  369. if multierr.Code() == s3.ErrCodeNoSuchKey {
  370. return true
  371. }
  372. if multierr.Code() == s3.ErrCodeNoSuchBucket {
  373. return true
  374. }
  375. }
  376. return strings.Contains(err.Error(), "404")
  377. }
  378. // IsPermission returns a boolean indicating whether the error is known to
  379. // report that permission is denied.
  380. func (S3Fs) IsPermission(err error) bool {
  381. if err == nil {
  382. return false
  383. }
  384. return strings.Contains(err.Error(), "403")
  385. }
  386. // CheckRootPath creates the specified root directory if it does not exists
  387. func (fs S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  388. // we need a local directory for temporary files
  389. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
  390. osFs.CheckRootPath(username, uid, gid)
  391. return fs.checkIfBucketExists() != nil
  392. }
  393. // ScanRootDirContents returns the number of files contained in the bucket,
  394. // and their size
  395. func (fs S3Fs) ScanRootDirContents() (int, int64, error) {
  396. numFiles := 0
  397. size := int64(0)
  398. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  399. defer cancelFn()
  400. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  401. Bucket: aws.String(fs.config.Bucket),
  402. Prefix: aws.String(fs.config.KeyPrefix),
  403. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  404. for _, fileObject := range page.Contents {
  405. numFiles++
  406. size += *fileObject.Size
  407. }
  408. return true
  409. })
  410. metrics.S3ListObjectsCompleted(err)
  411. return numFiles, size, err
  412. }
  413. // GetDirSize returns the number of files and the size for a folder
  414. // including any subfolders
  415. func (S3Fs) GetDirSize(dirname string) (int, int64, error) {
  416. return 0, 0, errUnsupported
  417. }
  418. // GetAtomicUploadPath returns the path to use for an atomic upload.
  419. // S3 uploads are already atomic, we never call this method for S3
  420. func (S3Fs) GetAtomicUploadPath(name string) string {
  421. return ""
  422. }
  423. // GetRelativePath returns the path for a file relative to the user's home dir.
  424. // This is the path as seen by SFTP users
  425. func (fs S3Fs) GetRelativePath(name string) string {
  426. rel := path.Clean(name)
  427. if rel == "." {
  428. rel = ""
  429. }
  430. if !strings.HasPrefix(rel, "/") {
  431. return "/" + rel
  432. }
  433. if len(fs.config.KeyPrefix) > 0 {
  434. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  435. rel = "/"
  436. }
  437. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  438. }
  439. return rel
  440. }
  441. // Walk walks the file tree rooted at root, calling walkFn for each file or
  442. // directory in the tree, including root
  443. func (S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  444. return errUnsupported
  445. }
  446. // Join joins any number of path elements into a single path
  447. func (S3Fs) Join(elem ...string) string {
  448. return path.Join(elem...)
  449. }
  450. // HasVirtualFolders returns true if folders are emulated
  451. func (S3Fs) HasVirtualFolders() bool {
  452. return true
  453. }
  454. // ResolvePath returns the matching filesystem path for the specified sftp path
  455. func (fs S3Fs) ResolvePath(sftpPath string) (string, error) {
  456. if !path.IsAbs(sftpPath) {
  457. sftpPath = path.Clean("/" + sftpPath)
  458. }
  459. return fs.Join("/", fs.config.KeyPrefix, sftpPath), nil
  460. }
  461. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  462. result := strings.TrimPrefix(*name, prefix)
  463. isDir := strings.HasSuffix(result, "/")
  464. if isDir {
  465. result = strings.TrimSuffix(result, "/")
  466. }
  467. if strings.Contains(result, "/") {
  468. i := strings.Index(result, "/")
  469. isDir = true
  470. result = result[:i]
  471. }
  472. return result, isDir
  473. }
  474. func (fs *S3Fs) isEqual(s3Key *string, sftpName string) bool {
  475. if *s3Key == sftpName {
  476. return true
  477. }
  478. if "/"+*s3Key == sftpName {
  479. return true
  480. }
  481. if "/"+*s3Key == sftpName+"/" {
  482. return true
  483. }
  484. return false
  485. }
  486. func (fs *S3Fs) checkIfBucketExists() error {
  487. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  488. defer cancelFn()
  489. _, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
  490. Bucket: aws.String(fs.config.Bucket),
  491. })
  492. metrics.S3HeadBucketCompleted(err)
  493. return err
  494. }