s3fs.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731
  1. // +build !nos3
  2. package vfs
  3. import (
  4. "context"
  5. "fmt"
  6. "mime"
  7. "net/url"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/credentials"
  16. "github.com/aws/aws-sdk-go/aws/request"
  17. "github.com/aws/aws-sdk-go/aws/session"
  18. "github.com/aws/aws-sdk-go/service/s3"
  19. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  20. "github.com/eikenb/pipeat"
  21. "github.com/pkg/sftp"
  22. "github.com/drakkan/sftpgo/v2/logger"
  23. "github.com/drakkan/sftpgo/v2/metric"
  24. "github.com/drakkan/sftpgo/v2/util"
  25. "github.com/drakkan/sftpgo/v2/version"
  26. )
  27. // using this mime type for directories improves compatibility with s3fs-fuse
  28. const s3DirMimeType = "application/x-directory"
  29. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  30. type S3Fs struct {
  31. connectionID string
  32. localTempDir string
  33. // if not empty this fs is mouted as virtual folder in the specified path
  34. mountPath string
  35. config *S3FsConfig
  36. svc *s3.S3
  37. ctxTimeout time.Duration
  38. ctxLongTimeout time.Duration
  39. }
  40. func init() {
  41. version.AddFeature("+s3")
  42. }
  43. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  44. // object storage
  45. func NewS3Fs(connectionID, localTempDir, mountPath string, config S3FsConfig) (Fs, error) {
  46. if localTempDir == "" {
  47. if tempPath != "" {
  48. localTempDir = tempPath
  49. } else {
  50. localTempDir = filepath.Clean(os.TempDir())
  51. }
  52. }
  53. fs := &S3Fs{
  54. connectionID: connectionID,
  55. localTempDir: localTempDir,
  56. mountPath: mountPath,
  57. config: &config,
  58. ctxTimeout: 30 * time.Second,
  59. ctxLongTimeout: 300 * time.Second,
  60. }
  61. if err := fs.config.Validate(); err != nil {
  62. return fs, err
  63. }
  64. awsConfig := aws.NewConfig()
  65. if fs.config.Region != "" {
  66. awsConfig.WithRegion(fs.config.Region)
  67. }
  68. if !fs.config.AccessSecret.IsEmpty() {
  69. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  70. return fs, err
  71. }
  72. awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), "")
  73. }
  74. if fs.config.Endpoint != "" {
  75. awsConfig.Endpoint = aws.String(fs.config.Endpoint)
  76. }
  77. if fs.config.ForcePathStyle {
  78. awsConfig.S3ForcePathStyle = aws.Bool(true)
  79. }
  80. if fs.config.UploadPartSize == 0 {
  81. fs.config.UploadPartSize = s3manager.DefaultUploadPartSize
  82. } else {
  83. fs.config.UploadPartSize *= 1024 * 1024
  84. }
  85. if fs.config.UploadConcurrency == 0 {
  86. fs.config.UploadConcurrency = s3manager.DefaultUploadConcurrency
  87. }
  88. if fs.config.DownloadPartSize == 0 {
  89. fs.config.DownloadPartSize = s3manager.DefaultDownloadPartSize
  90. } else {
  91. fs.config.DownloadPartSize *= 1024 * 1024
  92. }
  93. if fs.config.DownloadConcurrency == 0 {
  94. fs.config.DownloadConcurrency = s3manager.DefaultDownloadConcurrency
  95. }
  96. sessOpts := session.Options{
  97. Config: *awsConfig,
  98. SharedConfigState: session.SharedConfigEnable,
  99. }
  100. sess, err := session.NewSessionWithOptions(sessOpts)
  101. if err != nil {
  102. return fs, err
  103. }
  104. fs.svc = s3.New(sess)
  105. return fs, nil
  106. }
  107. // Name returns the name for the Fs implementation
  108. func (fs *S3Fs) Name() string {
  109. return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket)
  110. }
  111. // ConnectionID returns the connection ID associated to this Fs implementation
  112. func (fs *S3Fs) ConnectionID() string {
  113. return fs.connectionID
  114. }
  115. // Stat returns a FileInfo describing the named file
  116. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  117. var result *FileInfo
  118. if name == "/" || name == "." {
  119. err := fs.checkIfBucketExists()
  120. if err != nil {
  121. return result, err
  122. }
  123. return NewFileInfo(name, true, 0, time.Now(), false), nil
  124. }
  125. if "/"+fs.config.KeyPrefix == name+"/" {
  126. return NewFileInfo(name, true, 0, time.Now(), false), nil
  127. }
  128. obj, err := fs.headObject(name)
  129. if err == nil {
  130. // a "dir" has a trailing "/" so we cannot have a directory here
  131. objSize := *obj.ContentLength
  132. objectModTime := *obj.LastModified
  133. return NewFileInfo(name, false, objSize, objectModTime, false), nil
  134. }
  135. if !fs.IsNotExist(err) {
  136. return result, err
  137. }
  138. // now check if this is a prefix (virtual directory)
  139. hasContents, err := fs.hasContents(name)
  140. if err == nil && hasContents {
  141. return NewFileInfo(name, true, 0, time.Now(), false), nil
  142. } else if err != nil {
  143. return nil, err
  144. }
  145. // the requested file may still be a directory as a zero bytes key
  146. // with a trailing forward slash (created using mkdir).
  147. // S3 doesn't return content type when listing objects, so we have
  148. // create "dirs" adding a trailing "/" to the key
  149. return fs.getStatForDir(name)
  150. }
  151. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  152. var result *FileInfo
  153. obj, err := fs.headObject(name + "/")
  154. if err != nil {
  155. return result, err
  156. }
  157. objSize := *obj.ContentLength
  158. objectModTime := *obj.LastModified
  159. return NewFileInfo(name, true, objSize, objectModTime, false), nil
  160. }
  161. // Lstat returns a FileInfo describing the named file
  162. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  163. return fs.Stat(name)
  164. }
  165. // Open opens the named file for reading
  166. func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  167. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  168. if err != nil {
  169. return nil, nil, nil, err
  170. }
  171. ctx, cancelFn := context.WithCancel(context.Background())
  172. downloader := s3manager.NewDownloaderWithClient(fs.svc)
  173. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  174. downloader.RequestOptions = append(downloader.RequestOptions, func(r *request.Request) {
  175. chunkCtx, cancel := context.WithTimeout(r.Context(), time.Duration(fs.config.DownloadPartMaxTime)*time.Second)
  176. r.SetContext(chunkCtx)
  177. go func() {
  178. <-ctx.Done()
  179. cancel()
  180. }()
  181. })
  182. }
  183. var streamRange *string
  184. if offset > 0 {
  185. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  186. }
  187. go func() {
  188. defer cancelFn()
  189. n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
  190. Bucket: aws.String(fs.config.Bucket),
  191. Key: aws.String(name),
  192. Range: streamRange,
  193. }, func(d *s3manager.Downloader) {
  194. d.Concurrency = fs.config.DownloadConcurrency
  195. d.PartSize = fs.config.DownloadPartSize
  196. })
  197. w.CloseWithError(err) //nolint:errcheck
  198. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  199. metric.S3TransferCompleted(n, 1, err)
  200. }()
  201. return nil, r, cancelFn, nil
  202. }
  203. // Create creates or opens the named file for writing
  204. func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  205. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  206. if err != nil {
  207. return nil, nil, nil, err
  208. }
  209. p := NewPipeWriter(w)
  210. ctx, cancelFn := context.WithCancel(context.Background())
  211. uploader := s3manager.NewUploaderWithClient(fs.svc)
  212. go func() {
  213. defer cancelFn()
  214. key := name
  215. var contentType string
  216. if flag == -1 {
  217. contentType = s3DirMimeType
  218. } else {
  219. contentType = mime.TypeByExtension(path.Ext(name))
  220. }
  221. response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
  222. Bucket: aws.String(fs.config.Bucket),
  223. Key: aws.String(key),
  224. Body: r,
  225. StorageClass: util.NilIfEmpty(fs.config.StorageClass),
  226. ContentType: util.NilIfEmpty(contentType),
  227. }, func(u *s3manager.Uploader) {
  228. u.Concurrency = fs.config.UploadConcurrency
  229. u.PartSize = fs.config.UploadPartSize
  230. })
  231. r.CloseWithError(err) //nolint:errcheck
  232. p.Done(err)
  233. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, response: %v, readed bytes: %v, err: %+v",
  234. name, response, r.GetReadedBytes(), err)
  235. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  236. }()
  237. return nil, p, cancelFn, nil
  238. }
  239. // Rename renames (moves) source to target.
  240. // We don't support renaming non empty directories since we should
  241. // rename all the contents too and this could take long time: think
  242. // about directories with thousands of files, for each file we should
  243. // execute a CopyObject call.
  244. // TODO: rename does not work for files bigger than 5GB, implement
  245. // multipart copy or wait for this pull request to be merged:
  246. //
  247. // https://github.com/aws/aws-sdk-go/pull/2653
  248. //
  249. func (fs *S3Fs) Rename(source, target string) error {
  250. if source == target {
  251. return nil
  252. }
  253. fi, err := fs.Stat(source)
  254. if err != nil {
  255. return err
  256. }
  257. copySource := fs.Join(fs.config.Bucket, source)
  258. if fi.IsDir() {
  259. hasContents, err := fs.hasContents(source)
  260. if err != nil {
  261. return err
  262. }
  263. if hasContents {
  264. return fmt.Errorf("cannot rename non empty directory: %#v", source)
  265. }
  266. if !strings.HasSuffix(copySource, "/") {
  267. copySource += "/"
  268. }
  269. if !strings.HasSuffix(target, "/") {
  270. target += "/"
  271. }
  272. }
  273. var contentType string
  274. if fi.IsDir() {
  275. contentType = s3DirMimeType
  276. } else {
  277. contentType = mime.TypeByExtension(path.Ext(source))
  278. }
  279. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  280. defer cancelFn()
  281. _, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
  282. Bucket: aws.String(fs.config.Bucket),
  283. CopySource: aws.String(pathEscape(copySource)),
  284. Key: aws.String(target),
  285. StorageClass: util.NilIfEmpty(fs.config.StorageClass),
  286. ContentType: util.NilIfEmpty(contentType),
  287. })
  288. if err != nil {
  289. metric.S3CopyObjectCompleted(err)
  290. return err
  291. }
  292. err = fs.svc.WaitUntilObjectExistsWithContext(ctx, &s3.HeadObjectInput{
  293. Bucket: aws.String(fs.config.Bucket),
  294. Key: aws.String(target),
  295. })
  296. metric.S3CopyObjectCompleted(err)
  297. if err != nil {
  298. return err
  299. }
  300. return fs.Remove(source, fi.IsDir())
  301. }
  302. // Remove removes the named file or (empty) directory.
  303. func (fs *S3Fs) Remove(name string, isDir bool) error {
  304. if isDir {
  305. hasContents, err := fs.hasContents(name)
  306. if err != nil {
  307. return err
  308. }
  309. if hasContents {
  310. return fmt.Errorf("cannot remove non empty directory: %#v", name)
  311. }
  312. if !strings.HasSuffix(name, "/") {
  313. name += "/"
  314. }
  315. }
  316. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  317. defer cancelFn()
  318. _, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
  319. Bucket: aws.String(fs.config.Bucket),
  320. Key: aws.String(name),
  321. })
  322. metric.S3DeleteObjectCompleted(err)
  323. return err
  324. }
  325. // Mkdir creates a new directory with the specified name and default permissions
  326. func (fs *S3Fs) Mkdir(name string) error {
  327. _, err := fs.Stat(name)
  328. if !fs.IsNotExist(err) {
  329. return err
  330. }
  331. if !strings.HasSuffix(name, "/") {
  332. name += "/"
  333. }
  334. _, w, _, err := fs.Create(name, -1)
  335. if err != nil {
  336. return err
  337. }
  338. return w.Close()
  339. }
  340. // MkdirAll does nothing, we don't have folder
  341. func (*S3Fs) MkdirAll(name string, uid int, gid int) error {
  342. return nil
  343. }
  344. // Symlink creates source as a symbolic link to target.
  345. func (*S3Fs) Symlink(source, target string) error {
  346. return ErrVfsUnsupported
  347. }
  348. // Readlink returns the destination of the named symbolic link
  349. func (*S3Fs) Readlink(name string) (string, error) {
  350. return "", ErrVfsUnsupported
  351. }
  352. // Chown changes the numeric uid and gid of the named file.
  353. func (*S3Fs) Chown(name string, uid int, gid int) error {
  354. return ErrVfsUnsupported
  355. }
  356. // Chmod changes the mode of the named file to mode.
  357. func (*S3Fs) Chmod(name string, mode os.FileMode) error {
  358. return ErrVfsUnsupported
  359. }
  360. // Chtimes changes the access and modification times of the named file.
  361. func (*S3Fs) Chtimes(name string, atime, mtime time.Time) error {
  362. return ErrVfsUnsupported
  363. }
  364. // Truncate changes the size of the named file.
  365. // Truncate by path is not supported, while truncating an opened
  366. // file is handled inside base transfer
  367. func (*S3Fs) Truncate(name string, size int64) error {
  368. return ErrVfsUnsupported
  369. }
  370. // ReadDir reads the directory named by dirname and returns
  371. // a list of directory entries.
  372. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  373. var result []os.FileInfo
  374. // dirname must be already cleaned
  375. prefix := ""
  376. if dirname != "/" && dirname != "." {
  377. prefix = strings.TrimPrefix(dirname, "/")
  378. if !strings.HasSuffix(prefix, "/") {
  379. prefix += "/"
  380. }
  381. }
  382. prefixes := make(map[string]bool)
  383. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  384. defer cancelFn()
  385. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  386. Bucket: aws.String(fs.config.Bucket),
  387. Prefix: aws.String(prefix),
  388. Delimiter: aws.String("/"),
  389. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  390. for _, p := range page.CommonPrefixes {
  391. // prefixes have a trailing slash
  392. name, _ := fs.resolve(p.Prefix, prefix)
  393. if name == "" {
  394. continue
  395. }
  396. if _, ok := prefixes[name]; ok {
  397. continue
  398. }
  399. result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
  400. prefixes[name] = true
  401. }
  402. for _, fileObject := range page.Contents {
  403. objectSize := *fileObject.Size
  404. objectModTime := *fileObject.LastModified
  405. name, isDir := fs.resolve(fileObject.Key, prefix)
  406. if name == "" {
  407. continue
  408. }
  409. if isDir {
  410. if _, ok := prefixes[name]; ok {
  411. continue
  412. }
  413. prefixes[name] = true
  414. }
  415. result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  416. }
  417. return true
  418. })
  419. metric.S3ListObjectsCompleted(err)
  420. return result, err
  421. }
  422. // IsUploadResumeSupported returns true if resuming uploads is supported.
  423. // Resuming uploads is not supported on S3
  424. func (*S3Fs) IsUploadResumeSupported() bool {
  425. return false
  426. }
  427. // IsAtomicUploadSupported returns true if atomic upload is supported.
  428. // S3 uploads are already atomic, we don't need to upload to a temporary
  429. // file
  430. func (*S3Fs) IsAtomicUploadSupported() bool {
  431. return false
  432. }
  433. // IsNotExist returns a boolean indicating whether the error is known to
  434. // report that a file or directory does not exist
  435. func (*S3Fs) IsNotExist(err error) bool {
  436. if err == nil {
  437. return false
  438. }
  439. if aerr, ok := err.(awserr.Error); ok {
  440. if aerr.Code() == s3.ErrCodeNoSuchKey {
  441. return true
  442. }
  443. if aerr.Code() == s3.ErrCodeNoSuchBucket {
  444. return true
  445. }
  446. }
  447. if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
  448. if multierr.Code() == s3.ErrCodeNoSuchKey {
  449. return true
  450. }
  451. if multierr.Code() == s3.ErrCodeNoSuchBucket {
  452. return true
  453. }
  454. }
  455. return strings.Contains(err.Error(), "404")
  456. }
  457. // IsPermission returns a boolean indicating whether the error is known to
  458. // report that permission is denied.
  459. func (*S3Fs) IsPermission(err error) bool {
  460. if err == nil {
  461. return false
  462. }
  463. return strings.Contains(err.Error(), "403")
  464. }
  465. // IsNotSupported returns true if the error indicate an unsupported operation
  466. func (*S3Fs) IsNotSupported(err error) bool {
  467. if err == nil {
  468. return false
  469. }
  470. return err == ErrVfsUnsupported
  471. }
  472. // CheckRootPath creates the specified local root directory if it does not exists
  473. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  474. // we need a local directory for temporary files
  475. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
  476. return osFs.CheckRootPath(username, uid, gid)
  477. }
  478. // ScanRootDirContents returns the number of files contained in the bucket,
  479. // and their size
  480. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  481. numFiles := 0
  482. size := int64(0)
  483. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  484. defer cancelFn()
  485. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  486. Bucket: aws.String(fs.config.Bucket),
  487. Prefix: aws.String(fs.config.KeyPrefix),
  488. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  489. for _, fileObject := range page.Contents {
  490. isDir := strings.HasSuffix(*fileObject.Key, "/")
  491. if isDir && *fileObject.Size == 0 {
  492. continue
  493. }
  494. numFiles++
  495. size += *fileObject.Size
  496. }
  497. return true
  498. })
  499. metric.S3ListObjectsCompleted(err)
  500. return numFiles, size, err
  501. }
  502. // GetDirSize returns the number of files and the size for a folder
  503. // including any subfolders
  504. func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
  505. return 0, 0, ErrVfsUnsupported
  506. }
  507. // GetAtomicUploadPath returns the path to use for an atomic upload.
  508. // S3 uploads are already atomic, we never call this method for S3
  509. func (*S3Fs) GetAtomicUploadPath(name string) string {
  510. return ""
  511. }
  512. // GetRelativePath returns the path for a file relative to the user's home dir.
  513. // This is the path as seen by SFTPGo users
  514. func (fs *S3Fs) GetRelativePath(name string) string {
  515. rel := path.Clean(name)
  516. if rel == "." {
  517. rel = ""
  518. }
  519. if !path.IsAbs(rel) {
  520. return "/" + rel
  521. }
  522. if fs.config.KeyPrefix != "" {
  523. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  524. rel = "/"
  525. }
  526. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  527. }
  528. if fs.mountPath != "" {
  529. rel = path.Join(fs.mountPath, rel)
  530. }
  531. return rel
  532. }
  533. // Walk walks the file tree rooted at root, calling walkFn for each file or
  534. // directory in the tree, including root. The result are unordered
  535. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  536. prefix := ""
  537. if root != "/" && root != "." {
  538. prefix = strings.TrimPrefix(root, "/")
  539. if !strings.HasSuffix(prefix, "/") {
  540. prefix += "/"
  541. }
  542. }
  543. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  544. defer cancelFn()
  545. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  546. Bucket: aws.String(fs.config.Bucket),
  547. Prefix: aws.String(prefix),
  548. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  549. for _, fileObject := range page.Contents {
  550. objectSize := *fileObject.Size
  551. objectModTime := *fileObject.LastModified
  552. isDir := strings.HasSuffix(*fileObject.Key, "/")
  553. name := path.Clean(*fileObject.Key)
  554. if name == "/" || name == "." {
  555. continue
  556. }
  557. err := walkFn(fs.Join("/", *fileObject.Key), NewFileInfo(name, isDir, objectSize, objectModTime, false), nil)
  558. if err != nil {
  559. return false
  560. }
  561. }
  562. return true
  563. })
  564. metric.S3ListObjectsCompleted(err)
  565. walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck
  566. return err
  567. }
  568. // Join joins any number of path elements into a single path
  569. func (*S3Fs) Join(elem ...string) string {
  570. return path.Join(elem...)
  571. }
  572. // HasVirtualFolders returns true if folders are emulated
  573. func (*S3Fs) HasVirtualFolders() bool {
  574. return true
  575. }
  576. // ResolvePath returns the matching filesystem path for the specified virtual path
  577. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  578. if fs.mountPath != "" {
  579. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  580. }
  581. if !path.IsAbs(virtualPath) {
  582. virtualPath = path.Clean("/" + virtualPath)
  583. }
  584. return fs.Join("/", fs.config.KeyPrefix, virtualPath), nil
  585. }
  586. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  587. result := strings.TrimPrefix(*name, prefix)
  588. isDir := strings.HasSuffix(result, "/")
  589. if isDir {
  590. result = strings.TrimSuffix(result, "/")
  591. }
  592. if strings.Contains(result, "/") {
  593. i := strings.Index(result, "/")
  594. isDir = true
  595. result = result[:i]
  596. }
  597. return result, isDir
  598. }
  599. func (fs *S3Fs) checkIfBucketExists() error {
  600. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  601. defer cancelFn()
  602. _, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
  603. Bucket: aws.String(fs.config.Bucket),
  604. })
  605. metric.S3HeadBucketCompleted(err)
  606. return err
  607. }
  608. func (fs *S3Fs) hasContents(name string) (bool, error) {
  609. prefix := ""
  610. if name != "/" && name != "." {
  611. prefix = strings.TrimPrefix(name, "/")
  612. if !strings.HasSuffix(prefix, "/") {
  613. prefix += "/"
  614. }
  615. }
  616. maxResults := int64(2)
  617. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  618. defer cancelFn()
  619. results, err := fs.svc.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
  620. Bucket: aws.String(fs.config.Bucket),
  621. Prefix: aws.String(prefix),
  622. MaxKeys: &maxResults,
  623. })
  624. metric.S3ListObjectsCompleted(err)
  625. if err != nil {
  626. return false, err
  627. }
  628. // MinIO returns no contents while S3 returns 1 object
  629. // with the key equal to the prefix for empty directories
  630. for _, obj := range results.Contents {
  631. name, _ := fs.resolve(obj.Key, prefix)
  632. if name == "" || name == "/" {
  633. continue
  634. }
  635. return true, nil
  636. }
  637. return false, nil
  638. }
  639. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  640. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  641. defer cancelFn()
  642. obj, err := fs.svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
  643. Bucket: aws.String(fs.config.Bucket),
  644. Key: aws.String(name),
  645. })
  646. metric.S3HeadObjectCompleted(err)
  647. return obj, err
  648. }
  649. // GetMimeType returns the content type
  650. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  651. obj, err := fs.headObject(name)
  652. if err != nil {
  653. return "", err
  654. }
  655. return *obj.ContentType, err
  656. }
  657. // Close closes the fs
  658. func (*S3Fs) Close() error {
  659. return nil
  660. }
  661. // GetAvailableDiskSize return the available size for the specified path
  662. func (*S3Fs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  663. return nil, ErrStorageSizeUnavailable
  664. }
  665. // ideally we should simply use url.PathEscape:
  666. //
  667. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  668. //
  669. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  670. func pathEscape(in string) string {
  671. var u url.URL
  672. u.Path = in
  673. return strings.ReplaceAll(u.String(), "+", "%2B")
  674. }