s3fs.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734
  1. //go:build !nos3
  2. // +build !nos3
  3. package vfs
  4. import (
  5. "context"
  6. "fmt"
  7. "mime"
  8. "net/url"
  9. "os"
  10. "path"
  11. "path/filepath"
  12. "strings"
  13. "time"
  14. "github.com/aws/aws-sdk-go/aws"
  15. "github.com/aws/aws-sdk-go/aws/awserr"
  16. "github.com/aws/aws-sdk-go/aws/credentials"
  17. "github.com/aws/aws-sdk-go/aws/request"
  18. "github.com/aws/aws-sdk-go/aws/session"
  19. "github.com/aws/aws-sdk-go/service/s3"
  20. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  21. "github.com/eikenb/pipeat"
  22. "github.com/pkg/sftp"
  23. "github.com/drakkan/sftpgo/v2/logger"
  24. "github.com/drakkan/sftpgo/v2/metric"
  25. "github.com/drakkan/sftpgo/v2/util"
  26. "github.com/drakkan/sftpgo/v2/version"
  27. )
  28. // using this mime type for directories improves compatibility with s3fs-fuse
  29. const s3DirMimeType = "application/x-directory"
  30. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  31. type S3Fs struct {
  32. connectionID string
  33. localTempDir string
  34. // if not empty this fs is mouted as virtual folder in the specified path
  35. mountPath string
  36. config *S3FsConfig
  37. svc *s3.S3
  38. ctxTimeout time.Duration
  39. ctxLongTimeout time.Duration
  40. }
  41. func init() {
  42. version.AddFeature("+s3")
  43. }
  44. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  45. // object storage
  46. func NewS3Fs(connectionID, localTempDir, mountPath string, config S3FsConfig) (Fs, error) {
  47. if localTempDir == "" {
  48. if tempPath != "" {
  49. localTempDir = tempPath
  50. } else {
  51. localTempDir = filepath.Clean(os.TempDir())
  52. }
  53. }
  54. fs := &S3Fs{
  55. connectionID: connectionID,
  56. localTempDir: localTempDir,
  57. mountPath: mountPath,
  58. config: &config,
  59. ctxTimeout: 30 * time.Second,
  60. ctxLongTimeout: 300 * time.Second,
  61. }
  62. if err := fs.config.Validate(); err != nil {
  63. return fs, err
  64. }
  65. awsConfig := aws.NewConfig()
  66. if fs.config.Region != "" {
  67. awsConfig.WithRegion(fs.config.Region)
  68. }
  69. if !fs.config.AccessSecret.IsEmpty() {
  70. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  71. return fs, err
  72. }
  73. awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), "")
  74. }
  75. if fs.config.Endpoint != "" {
  76. awsConfig.Endpoint = aws.String(fs.config.Endpoint)
  77. }
  78. if fs.config.ForcePathStyle {
  79. awsConfig.S3ForcePathStyle = aws.Bool(true)
  80. }
  81. if fs.config.UploadPartSize == 0 {
  82. fs.config.UploadPartSize = s3manager.DefaultUploadPartSize
  83. } else {
  84. fs.config.UploadPartSize *= 1024 * 1024
  85. }
  86. if fs.config.UploadConcurrency == 0 {
  87. fs.config.UploadConcurrency = s3manager.DefaultUploadConcurrency
  88. }
  89. if fs.config.DownloadPartSize == 0 {
  90. fs.config.DownloadPartSize = s3manager.DefaultDownloadPartSize
  91. } else {
  92. fs.config.DownloadPartSize *= 1024 * 1024
  93. }
  94. if fs.config.DownloadConcurrency == 0 {
  95. fs.config.DownloadConcurrency = s3manager.DefaultDownloadConcurrency
  96. }
  97. sessOpts := session.Options{
  98. Config: *awsConfig,
  99. SharedConfigState: session.SharedConfigEnable,
  100. }
  101. sess, err := session.NewSessionWithOptions(sessOpts)
  102. if err != nil {
  103. return fs, err
  104. }
  105. fs.svc = s3.New(sess)
  106. return fs, nil
  107. }
  108. // Name returns the name for the Fs implementation
  109. func (fs *S3Fs) Name() string {
  110. return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket)
  111. }
  112. // ConnectionID returns the connection ID associated to this Fs implementation
  113. func (fs *S3Fs) ConnectionID() string {
  114. return fs.connectionID
  115. }
  116. // Stat returns a FileInfo describing the named file
  117. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  118. var result *FileInfo
  119. if name == "/" || name == "." {
  120. err := fs.checkIfBucketExists()
  121. if err != nil {
  122. return result, err
  123. }
  124. return NewFileInfo(name, true, 0, time.Now(), false), nil
  125. }
  126. if "/"+fs.config.KeyPrefix == name+"/" {
  127. return NewFileInfo(name, true, 0, time.Now(), false), nil
  128. }
  129. obj, err := fs.headObject(name)
  130. if err == nil {
  131. // a "dir" has a trailing "/" so we cannot have a directory here
  132. objSize := *obj.ContentLength
  133. objectModTime := *obj.LastModified
  134. return NewFileInfo(name, false, objSize, objectModTime, false), nil
  135. }
  136. if !fs.IsNotExist(err) {
  137. return result, err
  138. }
  139. // now check if this is a prefix (virtual directory)
  140. hasContents, err := fs.hasContents(name)
  141. if err == nil && hasContents {
  142. return NewFileInfo(name, true, 0, time.Now(), false), nil
  143. } else if err != nil {
  144. return nil, err
  145. }
  146. // the requested file may still be a directory as a zero bytes key
  147. // with a trailing forward slash (created using mkdir).
  148. // S3 doesn't return content type when listing objects, so we have
  149. // create "dirs" adding a trailing "/" to the key
  150. return fs.getStatForDir(name)
  151. }
  152. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  153. var result *FileInfo
  154. obj, err := fs.headObject(name + "/")
  155. if err != nil {
  156. return result, err
  157. }
  158. objSize := *obj.ContentLength
  159. objectModTime := *obj.LastModified
  160. return NewFileInfo(name, true, objSize, objectModTime, false), nil
  161. }
  162. // Lstat returns a FileInfo describing the named file
  163. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  164. return fs.Stat(name)
  165. }
  166. // Open opens the named file for reading
  167. func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  168. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  169. if err != nil {
  170. return nil, nil, nil, err
  171. }
  172. ctx, cancelFn := context.WithCancel(context.Background())
  173. downloader := s3manager.NewDownloaderWithClient(fs.svc)
  174. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  175. downloader.RequestOptions = append(downloader.RequestOptions, func(r *request.Request) {
  176. chunkCtx, cancel := context.WithTimeout(r.Context(), time.Duration(fs.config.DownloadPartMaxTime)*time.Second)
  177. r.SetContext(chunkCtx)
  178. go func() {
  179. <-ctx.Done()
  180. cancel()
  181. }()
  182. })
  183. }
  184. var streamRange *string
  185. if offset > 0 {
  186. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  187. }
  188. go func() {
  189. defer cancelFn()
  190. n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
  191. Bucket: aws.String(fs.config.Bucket),
  192. Key: aws.String(name),
  193. Range: streamRange,
  194. }, func(d *s3manager.Downloader) {
  195. d.Concurrency = fs.config.DownloadConcurrency
  196. d.PartSize = fs.config.DownloadPartSize
  197. })
  198. w.CloseWithError(err) //nolint:errcheck
  199. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  200. metric.S3TransferCompleted(n, 1, err)
  201. }()
  202. return nil, r, cancelFn, nil
  203. }
  204. // Create creates or opens the named file for writing
  205. func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  206. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  207. if err != nil {
  208. return nil, nil, nil, err
  209. }
  210. p := NewPipeWriter(w)
  211. ctx, cancelFn := context.WithCancel(context.Background())
  212. uploader := s3manager.NewUploaderWithClient(fs.svc)
  213. go func() {
  214. defer cancelFn()
  215. key := name
  216. var contentType string
  217. if flag == -1 {
  218. contentType = s3DirMimeType
  219. } else {
  220. contentType = mime.TypeByExtension(path.Ext(name))
  221. }
  222. response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
  223. Bucket: aws.String(fs.config.Bucket),
  224. Key: aws.String(key),
  225. Body: r,
  226. ACL: util.NilIfEmpty(fs.config.ACL),
  227. StorageClass: util.NilIfEmpty(fs.config.StorageClass),
  228. ContentType: util.NilIfEmpty(contentType),
  229. }, func(u *s3manager.Uploader) {
  230. u.Concurrency = fs.config.UploadConcurrency
  231. u.PartSize = fs.config.UploadPartSize
  232. })
  233. r.CloseWithError(err) //nolint:errcheck
  234. p.Done(err)
  235. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, acl: %#v, response: %v, readed bytes: %v, err: %+v",
  236. name, fs.config.ACL, response, r.GetReadedBytes(), err)
  237. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  238. }()
  239. return nil, p, cancelFn, nil
  240. }
  241. // Rename renames (moves) source to target.
  242. // We don't support renaming non empty directories since we should
  243. // rename all the contents too and this could take long time: think
  244. // about directories with thousands of files, for each file we should
  245. // execute a CopyObject call.
  246. // TODO: rename does not work for files bigger than 5GB, implement
  247. // multipart copy or wait for this pull request to be merged:
  248. //
  249. // https://github.com/aws/aws-sdk-go/pull/2653
  250. //
  251. func (fs *S3Fs) Rename(source, target string) error {
  252. if source == target {
  253. return nil
  254. }
  255. fi, err := fs.Stat(source)
  256. if err != nil {
  257. return err
  258. }
  259. copySource := fs.Join(fs.config.Bucket, source)
  260. if fi.IsDir() {
  261. hasContents, err := fs.hasContents(source)
  262. if err != nil {
  263. return err
  264. }
  265. if hasContents {
  266. return fmt.Errorf("cannot rename non empty directory: %#v", source)
  267. }
  268. if !strings.HasSuffix(copySource, "/") {
  269. copySource += "/"
  270. }
  271. if !strings.HasSuffix(target, "/") {
  272. target += "/"
  273. }
  274. }
  275. var contentType string
  276. if fi.IsDir() {
  277. contentType = s3DirMimeType
  278. } else {
  279. contentType = mime.TypeByExtension(path.Ext(source))
  280. }
  281. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  282. defer cancelFn()
  283. _, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
  284. Bucket: aws.String(fs.config.Bucket),
  285. CopySource: aws.String(pathEscape(copySource)),
  286. Key: aws.String(target),
  287. StorageClass: util.NilIfEmpty(fs.config.StorageClass),
  288. ACL: util.NilIfEmpty(fs.config.ACL),
  289. ContentType: util.NilIfEmpty(contentType),
  290. })
  291. if err != nil {
  292. metric.S3CopyObjectCompleted(err)
  293. return err
  294. }
  295. err = fs.svc.WaitUntilObjectExistsWithContext(ctx, &s3.HeadObjectInput{
  296. Bucket: aws.String(fs.config.Bucket),
  297. Key: aws.String(target),
  298. })
  299. metric.S3CopyObjectCompleted(err)
  300. if err != nil {
  301. return err
  302. }
  303. return fs.Remove(source, fi.IsDir())
  304. }
  305. // Remove removes the named file or (empty) directory.
  306. func (fs *S3Fs) Remove(name string, isDir bool) error {
  307. if isDir {
  308. hasContents, err := fs.hasContents(name)
  309. if err != nil {
  310. return err
  311. }
  312. if hasContents {
  313. return fmt.Errorf("cannot remove non empty directory: %#v", name)
  314. }
  315. if !strings.HasSuffix(name, "/") {
  316. name += "/"
  317. }
  318. }
  319. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  320. defer cancelFn()
  321. _, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
  322. Bucket: aws.String(fs.config.Bucket),
  323. Key: aws.String(name),
  324. })
  325. metric.S3DeleteObjectCompleted(err)
  326. return err
  327. }
  328. // Mkdir creates a new directory with the specified name and default permissions
  329. func (fs *S3Fs) Mkdir(name string) error {
  330. _, err := fs.Stat(name)
  331. if !fs.IsNotExist(err) {
  332. return err
  333. }
  334. if !strings.HasSuffix(name, "/") {
  335. name += "/"
  336. }
  337. _, w, _, err := fs.Create(name, -1)
  338. if err != nil {
  339. return err
  340. }
  341. return w.Close()
  342. }
  343. // MkdirAll does nothing, we don't have folder
  344. func (*S3Fs) MkdirAll(name string, uid int, gid int) error {
  345. return nil
  346. }
  347. // Symlink creates source as a symbolic link to target.
  348. func (*S3Fs) Symlink(source, target string) error {
  349. return ErrVfsUnsupported
  350. }
  351. // Readlink returns the destination of the named symbolic link
  352. func (*S3Fs) Readlink(name string) (string, error) {
  353. return "", ErrVfsUnsupported
  354. }
  355. // Chown changes the numeric uid and gid of the named file.
  356. func (*S3Fs) Chown(name string, uid int, gid int) error {
  357. return ErrVfsUnsupported
  358. }
  359. // Chmod changes the mode of the named file to mode.
  360. func (*S3Fs) Chmod(name string, mode os.FileMode) error {
  361. return ErrVfsUnsupported
  362. }
  363. // Chtimes changes the access and modification times of the named file.
  364. func (*S3Fs) Chtimes(name string, atime, mtime time.Time) error {
  365. return ErrVfsUnsupported
  366. }
  367. // Truncate changes the size of the named file.
  368. // Truncate by path is not supported, while truncating an opened
  369. // file is handled inside base transfer
  370. func (*S3Fs) Truncate(name string, size int64) error {
  371. return ErrVfsUnsupported
  372. }
  373. // ReadDir reads the directory named by dirname and returns
  374. // a list of directory entries.
  375. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  376. var result []os.FileInfo
  377. // dirname must be already cleaned
  378. prefix := ""
  379. if dirname != "/" && dirname != "." {
  380. prefix = strings.TrimPrefix(dirname, "/")
  381. if !strings.HasSuffix(prefix, "/") {
  382. prefix += "/"
  383. }
  384. }
  385. prefixes := make(map[string]bool)
  386. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  387. defer cancelFn()
  388. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  389. Bucket: aws.String(fs.config.Bucket),
  390. Prefix: aws.String(prefix),
  391. Delimiter: aws.String("/"),
  392. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  393. for _, p := range page.CommonPrefixes {
  394. // prefixes have a trailing slash
  395. name, _ := fs.resolve(p.Prefix, prefix)
  396. if name == "" {
  397. continue
  398. }
  399. if _, ok := prefixes[name]; ok {
  400. continue
  401. }
  402. result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
  403. prefixes[name] = true
  404. }
  405. for _, fileObject := range page.Contents {
  406. objectSize := *fileObject.Size
  407. objectModTime := *fileObject.LastModified
  408. name, isDir := fs.resolve(fileObject.Key, prefix)
  409. if name == "" {
  410. continue
  411. }
  412. if isDir {
  413. if _, ok := prefixes[name]; ok {
  414. continue
  415. }
  416. prefixes[name] = true
  417. }
  418. result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  419. }
  420. return true
  421. })
  422. metric.S3ListObjectsCompleted(err)
  423. return result, err
  424. }
  425. // IsUploadResumeSupported returns true if resuming uploads is supported.
  426. // Resuming uploads is not supported on S3
  427. func (*S3Fs) IsUploadResumeSupported() bool {
  428. return false
  429. }
  430. // IsAtomicUploadSupported returns true if atomic upload is supported.
  431. // S3 uploads are already atomic, we don't need to upload to a temporary
  432. // file
  433. func (*S3Fs) IsAtomicUploadSupported() bool {
  434. return false
  435. }
  436. // IsNotExist returns a boolean indicating whether the error is known to
  437. // report that a file or directory does not exist
  438. func (*S3Fs) IsNotExist(err error) bool {
  439. if err == nil {
  440. return false
  441. }
  442. if aerr, ok := err.(awserr.Error); ok {
  443. if aerr.Code() == s3.ErrCodeNoSuchKey {
  444. return true
  445. }
  446. if aerr.Code() == s3.ErrCodeNoSuchBucket {
  447. return true
  448. }
  449. }
  450. if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
  451. if multierr.Code() == s3.ErrCodeNoSuchKey {
  452. return true
  453. }
  454. if multierr.Code() == s3.ErrCodeNoSuchBucket {
  455. return true
  456. }
  457. }
  458. return strings.Contains(err.Error(), "404")
  459. }
  460. // IsPermission returns a boolean indicating whether the error is known to
  461. // report that permission is denied.
  462. func (*S3Fs) IsPermission(err error) bool {
  463. if err == nil {
  464. return false
  465. }
  466. return strings.Contains(err.Error(), "403")
  467. }
  468. // IsNotSupported returns true if the error indicate an unsupported operation
  469. func (*S3Fs) IsNotSupported(err error) bool {
  470. if err == nil {
  471. return false
  472. }
  473. return err == ErrVfsUnsupported
  474. }
  475. // CheckRootPath creates the specified local root directory if it does not exists
  476. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  477. // we need a local directory for temporary files
  478. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
  479. return osFs.CheckRootPath(username, uid, gid)
  480. }
  481. // ScanRootDirContents returns the number of files contained in the bucket,
  482. // and their size
  483. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  484. numFiles := 0
  485. size := int64(0)
  486. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  487. defer cancelFn()
  488. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  489. Bucket: aws.String(fs.config.Bucket),
  490. Prefix: aws.String(fs.config.KeyPrefix),
  491. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  492. for _, fileObject := range page.Contents {
  493. isDir := strings.HasSuffix(*fileObject.Key, "/")
  494. if isDir && *fileObject.Size == 0 {
  495. continue
  496. }
  497. numFiles++
  498. size += *fileObject.Size
  499. }
  500. return true
  501. })
  502. metric.S3ListObjectsCompleted(err)
  503. return numFiles, size, err
  504. }
  505. // GetDirSize returns the number of files and the size for a folder
  506. // including any subfolders
  507. func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
  508. return 0, 0, ErrVfsUnsupported
  509. }
  510. // GetAtomicUploadPath returns the path to use for an atomic upload.
  511. // S3 uploads are already atomic, we never call this method for S3
  512. func (*S3Fs) GetAtomicUploadPath(name string) string {
  513. return ""
  514. }
  515. // GetRelativePath returns the path for a file relative to the user's home dir.
  516. // This is the path as seen by SFTPGo users
  517. func (fs *S3Fs) GetRelativePath(name string) string {
  518. rel := path.Clean(name)
  519. if rel == "." {
  520. rel = ""
  521. }
  522. if !path.IsAbs(rel) {
  523. return "/" + rel
  524. }
  525. if fs.config.KeyPrefix != "" {
  526. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  527. rel = "/"
  528. }
  529. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  530. }
  531. if fs.mountPath != "" {
  532. rel = path.Join(fs.mountPath, rel)
  533. }
  534. return rel
  535. }
  536. // Walk walks the file tree rooted at root, calling walkFn for each file or
  537. // directory in the tree, including root. The result are unordered
  538. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  539. prefix := ""
  540. if root != "/" && root != "." {
  541. prefix = strings.TrimPrefix(root, "/")
  542. if !strings.HasSuffix(prefix, "/") {
  543. prefix += "/"
  544. }
  545. }
  546. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  547. defer cancelFn()
  548. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  549. Bucket: aws.String(fs.config.Bucket),
  550. Prefix: aws.String(prefix),
  551. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  552. for _, fileObject := range page.Contents {
  553. objectSize := *fileObject.Size
  554. objectModTime := *fileObject.LastModified
  555. isDir := strings.HasSuffix(*fileObject.Key, "/")
  556. name := path.Clean(*fileObject.Key)
  557. if name == "/" || name == "." {
  558. continue
  559. }
  560. err := walkFn(fs.Join("/", *fileObject.Key), NewFileInfo(name, isDir, objectSize, objectModTime, false), nil)
  561. if err != nil {
  562. return false
  563. }
  564. }
  565. return true
  566. })
  567. metric.S3ListObjectsCompleted(err)
  568. walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck
  569. return err
  570. }
  571. // Join joins any number of path elements into a single path
  572. func (*S3Fs) Join(elem ...string) string {
  573. return path.Join(elem...)
  574. }
  575. // HasVirtualFolders returns true if folders are emulated
  576. func (*S3Fs) HasVirtualFolders() bool {
  577. return true
  578. }
  579. // ResolvePath returns the matching filesystem path for the specified virtual path
  580. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  581. if fs.mountPath != "" {
  582. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  583. }
  584. if !path.IsAbs(virtualPath) {
  585. virtualPath = path.Clean("/" + virtualPath)
  586. }
  587. return fs.Join("/", fs.config.KeyPrefix, virtualPath), nil
  588. }
  589. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  590. result := strings.TrimPrefix(*name, prefix)
  591. isDir := strings.HasSuffix(result, "/")
  592. if isDir {
  593. result = strings.TrimSuffix(result, "/")
  594. }
  595. if strings.Contains(result, "/") {
  596. i := strings.Index(result, "/")
  597. isDir = true
  598. result = result[:i]
  599. }
  600. return result, isDir
  601. }
  602. func (fs *S3Fs) checkIfBucketExists() error {
  603. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  604. defer cancelFn()
  605. _, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
  606. Bucket: aws.String(fs.config.Bucket),
  607. })
  608. metric.S3HeadBucketCompleted(err)
  609. return err
  610. }
  611. func (fs *S3Fs) hasContents(name string) (bool, error) {
  612. prefix := ""
  613. if name != "/" && name != "." {
  614. prefix = strings.TrimPrefix(name, "/")
  615. if !strings.HasSuffix(prefix, "/") {
  616. prefix += "/"
  617. }
  618. }
  619. maxResults := int64(2)
  620. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  621. defer cancelFn()
  622. results, err := fs.svc.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
  623. Bucket: aws.String(fs.config.Bucket),
  624. Prefix: aws.String(prefix),
  625. MaxKeys: &maxResults,
  626. })
  627. metric.S3ListObjectsCompleted(err)
  628. if err != nil {
  629. return false, err
  630. }
  631. // MinIO returns no contents while S3 returns 1 object
  632. // with the key equal to the prefix for empty directories
  633. for _, obj := range results.Contents {
  634. name, _ := fs.resolve(obj.Key, prefix)
  635. if name == "" || name == "/" {
  636. continue
  637. }
  638. return true, nil
  639. }
  640. return false, nil
  641. }
  642. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  643. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  644. defer cancelFn()
  645. obj, err := fs.svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
  646. Bucket: aws.String(fs.config.Bucket),
  647. Key: aws.String(name),
  648. })
  649. metric.S3HeadObjectCompleted(err)
  650. return obj, err
  651. }
  652. // GetMimeType returns the content type
  653. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  654. obj, err := fs.headObject(name)
  655. if err != nil {
  656. return "", err
  657. }
  658. return *obj.ContentType, err
  659. }
  660. // Close closes the fs
  661. func (*S3Fs) Close() error {
  662. return nil
  663. }
  664. // GetAvailableDiskSize return the available size for the specified path
  665. func (*S3Fs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  666. return nil, ErrStorageSizeUnavailable
  667. }
  668. // ideally we should simply use url.PathEscape:
  669. //
  670. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  671. //
  672. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  673. func pathEscape(in string) string {
  674. var u url.URL
  675. u.Path = in
  676. return strings.ReplaceAll(u.String(), "+", "%2B")
  677. }