s3fs.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. // +build !nos3
  2. package vfs
  3. import (
  4. "context"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "github.com/aws/aws-sdk-go/aws"
  12. "github.com/aws/aws-sdk-go/aws/awserr"
  13. "github.com/aws/aws-sdk-go/aws/credentials"
  14. "github.com/aws/aws-sdk-go/aws/session"
  15. "github.com/aws/aws-sdk-go/service/s3"
  16. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  17. "github.com/eikenb/pipeat"
  18. "github.com/drakkan/sftpgo/logger"
  19. "github.com/drakkan/sftpgo/metrics"
  20. "github.com/drakkan/sftpgo/utils"
  21. )
  22. // S3Fs is a Fs implementation for Amazon S3 compatible object storage.
  23. type S3Fs struct {
  24. connectionID string
  25. localTempDir string
  26. config S3FsConfig
  27. svc *s3.S3
  28. ctxTimeout time.Duration
  29. ctxLongTimeout time.Duration
  30. }
  31. func init() {
  32. utils.AddFeature("+s3")
  33. }
  34. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  35. // object storage
  36. func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) {
  37. fs := S3Fs{
  38. connectionID: connectionID,
  39. localTempDir: localTempDir,
  40. config: config,
  41. ctxTimeout: 30 * time.Second,
  42. ctxLongTimeout: 300 * time.Second,
  43. }
  44. if err := ValidateS3FsConfig(&fs.config); err != nil {
  45. return fs, err
  46. }
  47. awsConfig := aws.NewConfig()
  48. if len(fs.config.Region) > 0 {
  49. awsConfig.WithRegion(fs.config.Region)
  50. }
  51. if len(fs.config.AccessSecret) > 0 {
  52. accessSecret, err := utils.DecryptData(fs.config.AccessSecret)
  53. if err != nil {
  54. return fs, err
  55. }
  56. fs.config.AccessSecret = accessSecret
  57. awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret, "")
  58. }
  59. if len(fs.config.Endpoint) > 0 {
  60. awsConfig.Endpoint = aws.String(fs.config.Endpoint)
  61. awsConfig.S3ForcePathStyle = aws.Bool(true)
  62. }
  63. if fs.config.UploadPartSize == 0 {
  64. fs.config.UploadPartSize = s3manager.DefaultUploadPartSize
  65. } else {
  66. fs.config.UploadPartSize *= 1024 * 1024
  67. }
  68. if fs.config.UploadConcurrency == 0 {
  69. fs.config.UploadConcurrency = 2
  70. }
  71. sessOpts := session.Options{
  72. Config: *awsConfig,
  73. SharedConfigState: session.SharedConfigEnable,
  74. }
  75. sess, err := session.NewSessionWithOptions(sessOpts)
  76. if err != nil {
  77. return fs, err
  78. }
  79. fs.svc = s3.New(sess)
  80. return fs, nil
  81. }
  82. // Name returns the name for the Fs implementation
  83. func (fs S3Fs) Name() string {
  84. return fmt.Sprintf("S3Fs bucket: %#v", fs.config.Bucket)
  85. }
  86. // ConnectionID returns the SSH connection ID associated to this Fs implementation
  87. func (fs S3Fs) ConnectionID() string {
  88. return fs.connectionID
  89. }
  90. // Stat returns a FileInfo describing the named file
  91. func (fs S3Fs) Stat(name string) (os.FileInfo, error) {
  92. var result FileInfo
  93. if name == "/" || name == "." {
  94. err := fs.checkIfBucketExists()
  95. if err != nil {
  96. return result, err
  97. }
  98. return NewFileInfo(name, true, 0, time.Time{}), nil
  99. }
  100. if "/"+fs.config.KeyPrefix == name+"/" {
  101. return NewFileInfo(name, true, 0, time.Time{}), nil
  102. }
  103. prefix := path.Dir(name)
  104. if prefix == "/" || prefix == "." {
  105. prefix = ""
  106. } else {
  107. prefix = strings.TrimPrefix(prefix, "/")
  108. if !strings.HasSuffix(prefix, "/") {
  109. prefix += "/"
  110. }
  111. }
  112. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  113. defer cancelFn()
  114. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  115. Bucket: aws.String(fs.config.Bucket),
  116. Prefix: aws.String(prefix),
  117. Delimiter: aws.String("/"),
  118. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  119. for _, p := range page.CommonPrefixes {
  120. if fs.isEqual(p.Prefix, name) {
  121. result = NewFileInfo(name, true, 0, time.Time{})
  122. return false
  123. }
  124. }
  125. for _, fileObject := range page.Contents {
  126. if fs.isEqual(fileObject.Key, name) {
  127. objectSize := *fileObject.Size
  128. objectModTime := *fileObject.LastModified
  129. isDir := strings.HasSuffix(*fileObject.Key, "/")
  130. result = NewFileInfo(name, isDir, objectSize, objectModTime)
  131. return false
  132. }
  133. }
  134. return true
  135. })
  136. metrics.S3ListObjectsCompleted(err)
  137. if err == nil && len(result.Name()) == 0 {
  138. err = errors.New("404 no such file or directory")
  139. }
  140. return result, err
  141. }
  142. // Lstat returns a FileInfo describing the named file
  143. func (fs S3Fs) Lstat(name string) (os.FileInfo, error) {
  144. return fs.Stat(name)
  145. }
  146. // Open opens the named file for reading
  147. func (fs S3Fs) Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error) {
  148. r, w, err := pipeat.AsyncWriterPipeInDir(fs.localTempDir)
  149. if err != nil {
  150. return nil, nil, nil, err
  151. }
  152. ctx, cancelFn := context.WithCancel(context.Background())
  153. downloader := s3manager.NewDownloaderWithClient(fs.svc)
  154. go func() {
  155. defer cancelFn()
  156. key := name
  157. n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
  158. Bucket: aws.String(fs.config.Bucket),
  159. Key: aws.String(key),
  160. })
  161. w.CloseWithError(err) //nolint:errcheck // the returned error is always null
  162. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  163. metrics.S3TransferCompleted(n, 1, err)
  164. }()
  165. return nil, r, cancelFn, nil
  166. }
  167. // Create creates or opens the named file for writing
  168. func (fs S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
  169. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  170. if err != nil {
  171. return nil, nil, nil, err
  172. }
  173. p := NewPipeWriter(w)
  174. ctx, cancelFn := context.WithCancel(context.Background())
  175. uploader := s3manager.NewUploaderWithClient(fs.svc)
  176. go func() {
  177. defer cancelFn()
  178. key := name
  179. response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
  180. Bucket: aws.String(fs.config.Bucket),
  181. Key: aws.String(key),
  182. Body: r,
  183. StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
  184. }, func(u *s3manager.Uploader) {
  185. u.Concurrency = fs.config.UploadConcurrency
  186. u.PartSize = fs.config.UploadPartSize
  187. })
  188. r.CloseWithError(err) //nolint:errcheck // the returned error is always null
  189. p.Done(GetSFTPError(fs, err))
  190. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, response: %v, readed bytes: %v, err: %+v",
  191. name, response, r.GetReadedBytes(), err)
  192. metrics.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  193. }()
  194. return nil, p, cancelFn, nil
  195. }
  196. // Rename renames (moves) source to target.
  197. // We don't support renaming non empty directories since we should
  198. // rename all the contents too and this could take long time: think
  199. // about directories with thousands of files, for each file we should
  200. // execute a CopyObject call.
  201. // TODO: rename does not work for files bigger than 5GB, implement
  202. // multipart copy or wait for this pull request to be merged:
  203. //
  204. // https://github.com/aws/aws-sdk-go/pull/2653
  205. //
  206. func (fs S3Fs) Rename(source, target string) error {
  207. if source == target {
  208. return nil
  209. }
  210. fi, err := fs.Stat(source)
  211. if err != nil {
  212. return err
  213. }
  214. copySource := fs.Join(fs.config.Bucket, source)
  215. if fi.IsDir() {
  216. contents, err := fs.ReadDir(source)
  217. if err != nil {
  218. return err
  219. }
  220. if len(contents) > 0 {
  221. return fmt.Errorf("Cannot rename non empty directory: %#v", source)
  222. }
  223. if !strings.HasSuffix(copySource, "/") {
  224. copySource += "/"
  225. }
  226. if !strings.HasSuffix(target, "/") {
  227. target += "/"
  228. }
  229. }
  230. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  231. defer cancelFn()
  232. _, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
  233. Bucket: aws.String(fs.config.Bucket),
  234. CopySource: aws.String(copySource),
  235. Key: aws.String(target),
  236. })
  237. metrics.S3CopyObjectCompleted(err)
  238. if err != nil {
  239. return err
  240. }
  241. return fs.Remove(source, fi.IsDir())
  242. }
  243. // Remove removes the named file or (empty) directory.
  244. func (fs S3Fs) Remove(name string, isDir bool) error {
  245. if isDir {
  246. contents, err := fs.ReadDir(name)
  247. if err != nil {
  248. return err
  249. }
  250. if len(contents) > 0 {
  251. return fmt.Errorf("Cannot remove non empty directory: %#v", name)
  252. }
  253. if !strings.HasSuffix(name, "/") {
  254. name += "/"
  255. }
  256. }
  257. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  258. defer cancelFn()
  259. _, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
  260. Bucket: aws.String(fs.config.Bucket),
  261. Key: aws.String(name),
  262. })
  263. metrics.S3DeleteObjectCompleted(err)
  264. return err
  265. }
  266. // Mkdir creates a new directory with the specified name and default permissions
  267. func (fs S3Fs) Mkdir(name string) error {
  268. _, err := fs.Stat(name)
  269. if !fs.IsNotExist(err) {
  270. return err
  271. }
  272. if !strings.HasSuffix(name, "/") {
  273. name += "/"
  274. }
  275. _, w, _, err := fs.Create(name, 0)
  276. if err != nil {
  277. return err
  278. }
  279. return w.Close()
  280. }
  281. // Symlink creates source as a symbolic link to target.
  282. func (S3Fs) Symlink(source, target string) error {
  283. return errors.New("403 symlinks are not supported")
  284. }
  285. // Chown changes the numeric uid and gid of the named file.
  286. // Silently ignored.
  287. func (S3Fs) Chown(name string, uid int, gid int) error {
  288. return nil
  289. }
  290. // Chmod changes the mode of the named file to mode.
  291. // Silently ignored.
  292. func (S3Fs) Chmod(name string, mode os.FileMode) error {
  293. return nil
  294. }
  295. // Chtimes changes the access and modification times of the named file.
  296. // Silently ignored.
  297. func (S3Fs) Chtimes(name string, atime, mtime time.Time) error {
  298. return errors.New("403 chtimes is not supported")
  299. }
  300. // ReadDir reads the directory named by dirname and returns
  301. // a list of directory entries.
  302. func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  303. var result []os.FileInfo
  304. // dirname must be already cleaned
  305. prefix := ""
  306. if dirname != "/" && dirname != "." {
  307. prefix = strings.TrimPrefix(dirname, "/")
  308. if !strings.HasSuffix(prefix, "/") {
  309. prefix += "/"
  310. }
  311. }
  312. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  313. defer cancelFn()
  314. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  315. Bucket: aws.String(fs.config.Bucket),
  316. Prefix: aws.String(prefix),
  317. Delimiter: aws.String("/"),
  318. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  319. for _, p := range page.CommonPrefixes {
  320. name, isDir := fs.resolve(p.Prefix, prefix)
  321. result = append(result, NewFileInfo(name, isDir, 0, time.Time{}))
  322. }
  323. for _, fileObject := range page.Contents {
  324. objectSize := *fileObject.Size
  325. objectModTime := *fileObject.LastModified
  326. name, isDir := fs.resolve(fileObject.Key, prefix)
  327. if len(name) == 0 {
  328. continue
  329. }
  330. result = append(result, NewFileInfo(name, isDir, objectSize, objectModTime))
  331. }
  332. return true
  333. })
  334. metrics.S3ListObjectsCompleted(err)
  335. return result, err
  336. }
  337. // IsUploadResumeSupported returns true if upload resume is supported.
  338. // SFTP Resume is not supported on S3
  339. func (S3Fs) IsUploadResumeSupported() bool {
  340. return false
  341. }
  342. // IsAtomicUploadSupported returns true if atomic upload is supported.
  343. // S3 uploads are already atomic, we don't need to upload to a temporary
  344. // file
  345. func (S3Fs) IsAtomicUploadSupported() bool {
  346. return false
  347. }
  348. // IsNotExist returns a boolean indicating whether the error is known to
  349. // report that a file or directory does not exist
  350. func (S3Fs) IsNotExist(err error) bool {
  351. if err == nil {
  352. return false
  353. }
  354. if aerr, ok := err.(awserr.Error); ok {
  355. if aerr.Code() == s3.ErrCodeNoSuchKey {
  356. return true
  357. }
  358. if aerr.Code() == s3.ErrCodeNoSuchBucket {
  359. return true
  360. }
  361. }
  362. if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
  363. if multierr.Code() == s3.ErrCodeNoSuchKey {
  364. return true
  365. }
  366. if multierr.Code() == s3.ErrCodeNoSuchBucket {
  367. return true
  368. }
  369. }
  370. return strings.Contains(err.Error(), "404")
  371. }
  372. // IsPermission returns a boolean indicating whether the error is known to
  373. // report that permission is denied.
  374. func (S3Fs) IsPermission(err error) bool {
  375. if err == nil {
  376. return false
  377. }
  378. return strings.Contains(err.Error(), "403")
  379. }
  380. // CheckRootPath creates the specified root directory if it does not exists
  381. func (fs S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  382. // we need a local directory for temporary files
  383. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
  384. osFs.CheckRootPath(username, uid, gid)
  385. return fs.checkIfBucketExists() != nil
  386. }
  387. // ScanRootDirContents returns the number of files contained in the bucket,
  388. // and their size
  389. func (fs S3Fs) ScanRootDirContents() (int, int64, error) {
  390. numFiles := 0
  391. size := int64(0)
  392. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  393. defer cancelFn()
  394. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  395. Bucket: aws.String(fs.config.Bucket),
  396. Prefix: aws.String(fs.config.KeyPrefix),
  397. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  398. for _, fileObject := range page.Contents {
  399. numFiles++
  400. size += *fileObject.Size
  401. }
  402. return true
  403. })
  404. metrics.S3ListObjectsCompleted(err)
  405. return numFiles, size, err
  406. }
  407. // GetDirSize returns the number of files and the size for a folder
  408. // including any subfolders
  409. func (S3Fs) GetDirSize(dirname string) (int, int64, error) {
  410. return 0, 0, errors.New("Not implemented")
  411. }
  412. // GetAtomicUploadPath returns the path to use for an atomic upload.
  413. // S3 uploads are already atomic, we never call this method for S3
  414. func (S3Fs) GetAtomicUploadPath(name string) string {
  415. return ""
  416. }
  417. // GetRelativePath returns the path for a file relative to the user's home dir.
  418. // This is the path as seen by SFTP users
  419. func (fs S3Fs) GetRelativePath(name string) string {
  420. rel := path.Clean(name)
  421. if rel == "." {
  422. rel = ""
  423. }
  424. if !strings.HasPrefix(rel, "/") {
  425. return "/" + rel
  426. }
  427. if len(fs.config.KeyPrefix) > 0 {
  428. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  429. rel = "/"
  430. }
  431. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  432. }
  433. return rel
  434. }
  435. // Join joins any number of path elements into a single path
  436. func (S3Fs) Join(elem ...string) string {
  437. return path.Join(elem...)
  438. }
  439. // ResolvePath returns the matching filesystem path for the specified sftp path
  440. func (fs S3Fs) ResolvePath(sftpPath string) (string, error) {
  441. if !path.IsAbs(sftpPath) {
  442. sftpPath = path.Clean("/" + sftpPath)
  443. }
  444. return fs.Join("/", fs.config.KeyPrefix, sftpPath), nil
  445. }
  446. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  447. result := strings.TrimPrefix(*name, prefix)
  448. isDir := strings.HasSuffix(result, "/")
  449. if isDir {
  450. result = strings.TrimSuffix(result, "/")
  451. }
  452. if strings.Contains(result, "/") {
  453. i := strings.Index(result, "/")
  454. isDir = true
  455. result = result[:i]
  456. }
  457. return result, isDir
  458. }
  459. func (fs *S3Fs) isEqual(s3Key *string, sftpName string) bool {
  460. if *s3Key == sftpName {
  461. return true
  462. }
  463. if "/"+*s3Key == sftpName {
  464. return true
  465. }
  466. if "/"+*s3Key == sftpName+"/" {
  467. return true
  468. }
  469. return false
  470. }
  471. func (fs *S3Fs) checkIfBucketExists() error {
  472. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  473. defer cancelFn()
  474. _, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
  475. Bucket: aws.String(fs.config.Bucket),
  476. })
  477. metrics.S3HeadBucketCompleted(err)
  478. return err
  479. }