s3fs.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. package vfs
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "path"
  8. "strings"
  9. "time"
  10. "github.com/aws/aws-sdk-go/aws"
  11. "github.com/aws/aws-sdk-go/aws/awserr"
  12. "github.com/aws/aws-sdk-go/aws/credentials"
  13. "github.com/aws/aws-sdk-go/aws/session"
  14. "github.com/aws/aws-sdk-go/service/s3"
  15. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  16. "github.com/drakkan/sftpgo/logger"
  17. "github.com/drakkan/sftpgo/metrics"
  18. "github.com/drakkan/sftpgo/utils"
  19. "github.com/eikenb/pipeat"
  20. )
  21. // S3FsConfig defines the configuration for S3 based filesystem
  22. type S3FsConfig struct {
  23. Bucket string `json:"bucket,omitempty"`
  24. // KeyPrefix is similar to a chroot directory for local filesystem.
  25. // If specified the SFTP user will only see objects that starts with
  26. // this prefix and so you can restrict access to a specific virtual
  27. // folder. The prefix, if not empty, must not start with "/" and must
  28. // end with "/".
  29. // If empty the whole bucket contents will be available
  30. KeyPrefix string `json:"key_prefix,omitempty"`
  31. Region string `json:"region,omitempty"`
  32. AccessKey string `json:"access_key,omitempty"`
  33. AccessSecret string `json:"access_secret,omitempty"`
  34. Endpoint string `json:"endpoint,omitempty"`
  35. StorageClass string `json:"storage_class,omitempty"`
  36. // The buffer size (in MB) to use for multipart uploads. The minimum allowed part size is 5MB,
  37. // and if this value is set to zero, the default value (5MB) for the AWS SDK will be used.
  38. // The minimum allowed value is 5.
  39. // Please note that if the upload bandwidth between the SFTP client and SFTPGo is greater than
  40. // the upload bandwidth between SFTPGo and S3 then the SFTP client have to wait for the upload
  41. // of the last parts to S3 after it ends the file upload to SFTPGo, and it may time out.
  42. // Keep this in mind if you customize these parameters.
  43. UploadPartSize int64 `json:"upload_part_size,omitempty"`
  44. // How many parts are uploaded in parallel
  45. UploadConcurrency int `json:"upload_concurrency,omitempty"`
  46. }
  47. // S3Fs is a Fs implementation for Amazon S3 compatible object storage.
  48. type S3Fs struct {
  49. connectionID string
  50. localTempDir string
  51. config S3FsConfig
  52. svc *s3.S3
  53. ctxTimeout time.Duration
  54. ctxLongTimeout time.Duration
  55. }
  56. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  57. // object storage
  58. func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) {
  59. fs := S3Fs{
  60. connectionID: connectionID,
  61. localTempDir: localTempDir,
  62. config: config,
  63. ctxTimeout: 30 * time.Second,
  64. ctxLongTimeout: 300 * time.Second,
  65. }
  66. if err := ValidateS3FsConfig(&fs.config); err != nil {
  67. return fs, err
  68. }
  69. awsConfig := aws.NewConfig()
  70. if len(fs.config.Region) > 0 {
  71. awsConfig.WithRegion(fs.config.Region)
  72. }
  73. if len(fs.config.AccessSecret) > 0 {
  74. accessSecret, err := utils.DecryptData(fs.config.AccessSecret)
  75. if err != nil {
  76. return fs, err
  77. }
  78. fs.config.AccessSecret = accessSecret
  79. awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret, "")
  80. }
  81. if len(fs.config.Endpoint) > 0 {
  82. awsConfig.Endpoint = aws.String(fs.config.Endpoint)
  83. awsConfig.S3ForcePathStyle = aws.Bool(true)
  84. }
  85. if fs.config.UploadPartSize == 0 {
  86. fs.config.UploadPartSize = s3manager.DefaultUploadPartSize
  87. } else {
  88. fs.config.UploadPartSize *= 1024 * 1024
  89. }
  90. if fs.config.UploadConcurrency == 0 {
  91. fs.config.UploadConcurrency = 2
  92. }
  93. sessOpts := session.Options{
  94. Config: *awsConfig,
  95. SharedConfigState: session.SharedConfigEnable,
  96. }
  97. sess, err := session.NewSessionWithOptions(sessOpts)
  98. if err != nil {
  99. return fs, err
  100. }
  101. fs.svc = s3.New(sess)
  102. return fs, nil
  103. }
  104. // Name returns the name for the Fs implementation
  105. func (fs S3Fs) Name() string {
  106. return fmt.Sprintf("S3Fs bucket: %#v", fs.config.Bucket)
  107. }
  108. // ConnectionID returns the SSH connection ID associated to this Fs implementation
  109. func (fs S3Fs) ConnectionID() string {
  110. return fs.connectionID
  111. }
  112. // Stat returns a FileInfo describing the named file
  113. func (fs S3Fs) Stat(name string) (os.FileInfo, error) {
  114. var result FileInfo
  115. if name == "/" || name == "." {
  116. err := fs.checkIfBucketExists()
  117. if err != nil {
  118. return result, err
  119. }
  120. return NewFileInfo(name, true, 0, time.Time{}), nil
  121. }
  122. if "/"+fs.config.KeyPrefix == name+"/" {
  123. return NewFileInfo(name, true, 0, time.Time{}), nil
  124. }
  125. prefix := path.Dir(name)
  126. if prefix == "/" || prefix == "." {
  127. prefix = ""
  128. } else {
  129. prefix = strings.TrimPrefix(prefix, "/")
  130. if !strings.HasSuffix(prefix, "/") {
  131. prefix += "/"
  132. }
  133. }
  134. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  135. defer cancelFn()
  136. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  137. Bucket: aws.String(fs.config.Bucket),
  138. Prefix: aws.String(prefix),
  139. Delimiter: aws.String("/"),
  140. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  141. for _, p := range page.CommonPrefixes {
  142. if fs.isEqual(p.Prefix, name) {
  143. result = NewFileInfo(name, true, 0, time.Time{})
  144. return false
  145. }
  146. }
  147. for _, fileObject := range page.Contents {
  148. if fs.isEqual(fileObject.Key, name) {
  149. objectSize := *fileObject.Size
  150. objectModTime := *fileObject.LastModified
  151. isDir := strings.HasSuffix(*fileObject.Key, "/")
  152. result = NewFileInfo(name, isDir, objectSize, objectModTime)
  153. return false
  154. }
  155. }
  156. return true
  157. })
  158. metrics.S3ListObjectsCompleted(err)
  159. if err == nil && len(result.Name()) == 0 {
  160. err = errors.New("404 no such file or directory")
  161. }
  162. return result, err
  163. }
  164. // Lstat returns a FileInfo describing the named file
  165. func (fs S3Fs) Lstat(name string) (os.FileInfo, error) {
  166. return fs.Stat(name)
  167. }
  168. // Open opens the named file for reading
  169. func (fs S3Fs) Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error) {
  170. r, w, err := pipeat.AsyncWriterPipeInDir(fs.localTempDir)
  171. if err != nil {
  172. return nil, nil, nil, err
  173. }
  174. ctx, cancelFn := context.WithCancel(context.Background())
  175. downloader := s3manager.NewDownloaderWithClient(fs.svc)
  176. go func() {
  177. defer cancelFn()
  178. key := name
  179. n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
  180. Bucket: aws.String(fs.config.Bucket),
  181. Key: aws.String(key),
  182. })
  183. w.CloseWithError(err)
  184. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  185. metrics.S3TransferCompleted(n, 1, err)
  186. }()
  187. return nil, r, cancelFn, nil
  188. }
  189. // Create creates or opens the named file for writing
  190. func (fs S3Fs) Create(name string, flag int) (*os.File, *pipeat.PipeWriterAt, func(), error) {
  191. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  192. if err != nil {
  193. return nil, nil, nil, err
  194. }
  195. ctx, cancelFn := context.WithCancel(context.Background())
  196. uploader := s3manager.NewUploaderWithClient(fs.svc)
  197. go func() {
  198. defer cancelFn()
  199. key := name
  200. response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
  201. Bucket: aws.String(fs.config.Bucket),
  202. Key: aws.String(key),
  203. Body: r,
  204. StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
  205. }, func(u *s3manager.Uploader) {
  206. u.Concurrency = fs.config.UploadConcurrency
  207. u.PartSize = fs.config.UploadPartSize
  208. })
  209. r.CloseWithError(err)
  210. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, response: %v, readed bytes: %v, err: %+v",
  211. name, response, r.GetReadedBytes(), err)
  212. metrics.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  213. }()
  214. return nil, w, cancelFn, nil
  215. }
  216. // Rename renames (moves) source to target.
  217. // We don't support renaming non empty directories since we should
  218. // rename all the contents too and this could take long time: think
  219. // about directories with thousands of files, for each file we should
  220. // execute a CopyObject call.
  221. // TODO: rename does not work for files bigger than 5GB, implement
  222. // multipart copy or wait for this pull request to be merged:
  223. //
  224. // https://github.com/aws/aws-sdk-go/pull/2653
  225. //
  226. func (fs S3Fs) Rename(source, target string) error {
  227. if source == target {
  228. return nil
  229. }
  230. fi, err := fs.Stat(source)
  231. if err != nil {
  232. return err
  233. }
  234. copySource := fs.Join(fs.config.Bucket, source)
  235. if fi.IsDir() {
  236. contents, err := fs.ReadDir(source)
  237. if err != nil {
  238. return err
  239. }
  240. if len(contents) > 0 {
  241. return fmt.Errorf("Cannot rename non empty directory: %#v", source)
  242. }
  243. if !strings.HasSuffix(copySource, "/") {
  244. copySource += "/"
  245. }
  246. if !strings.HasSuffix(target, "/") {
  247. target += "/"
  248. }
  249. }
  250. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  251. defer cancelFn()
  252. _, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
  253. Bucket: aws.String(fs.config.Bucket),
  254. CopySource: aws.String(copySource),
  255. Key: aws.String(target),
  256. })
  257. metrics.S3CopyObjectCompleted(err)
  258. if err != nil {
  259. return err
  260. }
  261. return fs.Remove(source, fi.IsDir())
  262. }
  263. // Remove removes the named file or (empty) directory.
  264. func (fs S3Fs) Remove(name string, isDir bool) error {
  265. if isDir {
  266. contents, err := fs.ReadDir(name)
  267. if err != nil {
  268. return err
  269. }
  270. if len(contents) > 0 {
  271. return fmt.Errorf("Cannot remove non empty directory: %#v", name)
  272. }
  273. if !strings.HasSuffix(name, "/") {
  274. name += "/"
  275. }
  276. }
  277. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  278. defer cancelFn()
  279. _, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
  280. Bucket: aws.String(fs.config.Bucket),
  281. Key: aws.String(name),
  282. })
  283. metrics.S3DeleteObjectCompleted(err)
  284. return err
  285. }
  286. // Mkdir creates a new directory with the specified name and default permissions
  287. func (fs S3Fs) Mkdir(name string) error {
  288. _, err := fs.Stat(name)
  289. if !fs.IsNotExist(err) {
  290. return err
  291. }
  292. if !strings.HasSuffix(name, "/") {
  293. name += "/"
  294. }
  295. _, w, _, err := fs.Create(name, 0)
  296. if err != nil {
  297. return err
  298. }
  299. return w.Close()
  300. }
  301. // Symlink creates source as a symbolic link to target.
  302. func (S3Fs) Symlink(source, target string) error {
  303. return errors.New("403 symlinks are not supported")
  304. }
  305. // Chown changes the numeric uid and gid of the named file.
  306. // Silently ignored.
  307. func (S3Fs) Chown(name string, uid int, gid int) error {
  308. return nil
  309. }
  310. // Chmod changes the mode of the named file to mode.
  311. // Silently ignored.
  312. func (S3Fs) Chmod(name string, mode os.FileMode) error {
  313. return nil
  314. }
  315. // Chtimes changes the access and modification times of the named file.
  316. // Silently ignored.
  317. func (S3Fs) Chtimes(name string, atime, mtime time.Time) error {
  318. return errors.New("403 chtimes is not supported")
  319. }
  320. // ReadDir reads the directory named by dirname and returns
  321. // a list of directory entries.
  322. func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  323. var result []os.FileInfo
  324. // dirname must be already cleaned
  325. prefix := ""
  326. if dirname != "/" && dirname != "." {
  327. prefix = strings.TrimPrefix(dirname, "/")
  328. if !strings.HasSuffix(prefix, "/") {
  329. prefix += "/"
  330. }
  331. }
  332. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  333. defer cancelFn()
  334. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  335. Bucket: aws.String(fs.config.Bucket),
  336. Prefix: aws.String(prefix),
  337. Delimiter: aws.String("/"),
  338. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  339. for _, p := range page.CommonPrefixes {
  340. name, isDir := fs.resolve(p.Prefix, prefix)
  341. result = append(result, NewFileInfo(name, isDir, 0, time.Time{}))
  342. }
  343. for _, fileObject := range page.Contents {
  344. objectSize := *fileObject.Size
  345. objectModTime := *fileObject.LastModified
  346. name, isDir := fs.resolve(fileObject.Key, prefix)
  347. if len(name) == 0 {
  348. continue
  349. }
  350. result = append(result, NewFileInfo(name, isDir, objectSize, objectModTime))
  351. }
  352. return true
  353. })
  354. metrics.S3ListObjectsCompleted(err)
  355. return result, err
  356. }
  357. // IsUploadResumeSupported returns true if upload resume is supported.
  358. // SFTP Resume is not supported on S3
  359. func (S3Fs) IsUploadResumeSupported() bool {
  360. return false
  361. }
  362. // IsAtomicUploadSupported returns true if atomic upload is supported.
  363. // S3 uploads are already atomic, we don't need to upload to a temporary
  364. // file
  365. func (S3Fs) IsAtomicUploadSupported() bool {
  366. return false
  367. }
  368. // IsNotExist returns a boolean indicating whether the error is known to
  369. // report that a file or directory does not exist
  370. func (S3Fs) IsNotExist(err error) bool {
  371. if err == nil {
  372. return false
  373. }
  374. if aerr, ok := err.(awserr.Error); ok {
  375. if aerr.Code() == s3.ErrCodeNoSuchKey {
  376. return true
  377. }
  378. if aerr.Code() == s3.ErrCodeNoSuchBucket {
  379. return true
  380. }
  381. }
  382. if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
  383. if multierr.Code() == s3.ErrCodeNoSuchKey {
  384. return true
  385. }
  386. if multierr.Code() == s3.ErrCodeNoSuchBucket {
  387. return true
  388. }
  389. }
  390. return strings.Contains(err.Error(), "404")
  391. }
  392. // IsPermission returns a boolean indicating whether the error is known to
  393. // report that permission is denied.
  394. func (S3Fs) IsPermission(err error) bool {
  395. if err == nil {
  396. return false
  397. }
  398. return strings.Contains(err.Error(), "403")
  399. }
  400. // CheckRootPath creates the specified root directory if it does not exists
  401. func (fs S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  402. // we need a local directory for temporary files
  403. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
  404. osFs.CheckRootPath(username, uid, gid)
  405. return fs.checkIfBucketExists() != nil
  406. }
  407. // ScanRootDirContents returns the number of files contained in the bucket,
  408. // and their size
  409. func (fs S3Fs) ScanRootDirContents() (int, int64, error) {
  410. numFiles := 0
  411. size := int64(0)
  412. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  413. defer cancelFn()
  414. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  415. Bucket: aws.String(fs.config.Bucket),
  416. Prefix: aws.String(fs.config.KeyPrefix),
  417. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  418. for _, fileObject := range page.Contents {
  419. numFiles++
  420. size += *fileObject.Size
  421. }
  422. return true
  423. })
  424. metrics.S3ListObjectsCompleted(err)
  425. return numFiles, size, err
  426. }
  427. // GetAtomicUploadPath returns the path to use for an atomic upload.
  428. // S3 uploads are already atomic, we never call this method for S3
  429. func (S3Fs) GetAtomicUploadPath(name string) string {
  430. return ""
  431. }
  432. // GetRelativePath returns the path for a file relative to the user's home dir.
  433. // This is the path as seen by SFTP users
  434. func (fs S3Fs) GetRelativePath(name string) string {
  435. rel := path.Clean(name)
  436. if rel == "." {
  437. rel = ""
  438. }
  439. if !strings.HasPrefix(rel, "/") {
  440. return "/" + rel
  441. }
  442. if len(fs.config.KeyPrefix) > 0 {
  443. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  444. rel = "/"
  445. }
  446. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  447. }
  448. return rel
  449. }
  450. // Join joins any number of path elements into a single path
  451. func (S3Fs) Join(elem ...string) string {
  452. return path.Join(elem...)
  453. }
  454. // ResolvePath returns the matching filesystem path for the specified sftp path
  455. func (fs S3Fs) ResolvePath(sftpPath string) (string, error) {
  456. if !path.IsAbs(sftpPath) {
  457. sftpPath = path.Clean("/" + sftpPath)
  458. }
  459. return fs.Join("/", fs.config.KeyPrefix, sftpPath), nil
  460. }
  461. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  462. result := strings.TrimPrefix(*name, prefix)
  463. isDir := strings.HasSuffix(result, "/")
  464. if isDir {
  465. result = strings.TrimSuffix(result, "/")
  466. }
  467. if strings.Contains(result, "/") {
  468. i := strings.Index(result, "/")
  469. isDir = true
  470. result = result[:i]
  471. }
  472. return result, isDir
  473. }
  474. func (fs *S3Fs) isEqual(s3Key *string, sftpName string) bool {
  475. if *s3Key == sftpName {
  476. return true
  477. }
  478. if "/"+*s3Key == sftpName {
  479. return true
  480. }
  481. if "/"+*s3Key == sftpName+"/" {
  482. return true
  483. }
  484. return false
  485. }
  486. func (fs *S3Fs) checkIfBucketExists() error {
  487. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  488. defer cancelFn()
  489. _, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
  490. Bucket: aws.String(fs.config.Bucket),
  491. })
  492. metrics.S3HeadBucketCompleted(err)
  493. return err
  494. }
  495. func (fs *S3Fs) getObjectDetails(key string) (*s3.HeadObjectOutput, error) {
  496. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  497. defer cancelFn()
  498. input := &s3.HeadObjectInput{
  499. Bucket: aws.String(fs.config.Bucket),
  500. Key: aws.String(key),
  501. }
  502. return fs.svc.HeadObjectWithContext(ctx, input)
  503. }