s3fs.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. // +build !nos3
  2. package vfs
  3. import (
  4. "context"
  5. "errors"
  6. "fmt"
  7. "mime"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/credentials"
  16. "github.com/aws/aws-sdk-go/aws/session"
  17. "github.com/aws/aws-sdk-go/service/s3"
  18. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  19. "github.com/eikenb/pipeat"
  20. "github.com/drakkan/sftpgo/logger"
  21. "github.com/drakkan/sftpgo/metrics"
  22. "github.com/drakkan/sftpgo/utils"
  23. "github.com/drakkan/sftpgo/version"
  24. )
  25. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  26. type S3Fs struct {
  27. connectionID string
  28. localTempDir string
  29. config S3FsConfig
  30. svc *s3.S3
  31. ctxTimeout time.Duration
  32. ctxLongTimeout time.Duration
  33. }
  34. func init() {
  35. version.AddFeature("+s3")
  36. }
  37. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  38. // object storage
  39. func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) {
  40. fs := &S3Fs{
  41. connectionID: connectionID,
  42. localTempDir: localTempDir,
  43. config: config,
  44. ctxTimeout: 30 * time.Second,
  45. ctxLongTimeout: 300 * time.Second,
  46. }
  47. if err := ValidateS3FsConfig(&fs.config); err != nil {
  48. return fs, err
  49. }
  50. awsConfig := aws.NewConfig()
  51. if fs.config.Region != "" {
  52. awsConfig.WithRegion(fs.config.Region)
  53. }
  54. if fs.config.AccessSecret.IsEncrypted() {
  55. err := fs.config.AccessSecret.Decrypt()
  56. if err != nil {
  57. return fs, err
  58. }
  59. awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret.Payload, "")
  60. }
  61. if fs.config.Endpoint != "" {
  62. awsConfig.Endpoint = aws.String(fs.config.Endpoint)
  63. awsConfig.S3ForcePathStyle = aws.Bool(true)
  64. }
  65. if fs.config.UploadPartSize == 0 {
  66. fs.config.UploadPartSize = s3manager.DefaultUploadPartSize
  67. } else {
  68. fs.config.UploadPartSize *= 1024 * 1024
  69. }
  70. if fs.config.UploadConcurrency == 0 {
  71. fs.config.UploadConcurrency = 2
  72. }
  73. sessOpts := session.Options{
  74. Config: *awsConfig,
  75. SharedConfigState: session.SharedConfigEnable,
  76. }
  77. sess, err := session.NewSessionWithOptions(sessOpts)
  78. if err != nil {
  79. return fs, err
  80. }
  81. fs.svc = s3.New(sess)
  82. return fs, nil
  83. }
  84. // Name returns the name for the Fs implementation
  85. func (fs *S3Fs) Name() string {
  86. return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket)
  87. }
  88. // ConnectionID returns the connection ID associated to this Fs implementation
  89. func (fs *S3Fs) ConnectionID() string {
  90. return fs.connectionID
  91. }
  92. // Stat returns a FileInfo describing the named file
  93. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  94. var result FileInfo
  95. if name == "/" || name == "." {
  96. err := fs.checkIfBucketExists()
  97. if err != nil {
  98. return result, err
  99. }
  100. return NewFileInfo(name, true, 0, time.Now(), false), nil
  101. }
  102. if "/"+fs.config.KeyPrefix == name+"/" {
  103. return NewFileInfo(name, true, 0, time.Now(), false), nil
  104. }
  105. obj, err := fs.headObject(name)
  106. if err == nil {
  107. objSize := *obj.ContentLength
  108. objectModTime := *obj.LastModified
  109. return NewFileInfo(name, false, objSize, objectModTime, false), nil
  110. }
  111. if !fs.IsNotExist(err) {
  112. return result, err
  113. }
  114. // now check if this is a prefix (virtual directory)
  115. hasContents, err := fs.hasContents(name)
  116. if err == nil && hasContents {
  117. return NewFileInfo(name, true, 0, time.Now(), false), nil
  118. } else if err != nil {
  119. return nil, err
  120. }
  121. // the requested file could still be a directory as a zero bytes key
  122. // with a forwarding slash.
  123. // As last resort we do a list dir to find it
  124. return fs.getStatCompat(name)
  125. }
  126. func (fs *S3Fs) getStatCompat(name string) (os.FileInfo, error) {
  127. var result FileInfo
  128. prefix := path.Dir(name)
  129. if prefix == "/" || prefix == "." {
  130. prefix = ""
  131. } else {
  132. prefix = strings.TrimPrefix(prefix, "/")
  133. if !strings.HasSuffix(prefix, "/") {
  134. prefix += "/"
  135. }
  136. }
  137. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  138. defer cancelFn()
  139. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  140. Bucket: aws.String(fs.config.Bucket),
  141. Prefix: aws.String(prefix),
  142. Delimiter: aws.String("/"),
  143. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  144. for _, p := range page.CommonPrefixes {
  145. if fs.isEqual(p.Prefix, name) {
  146. result = NewFileInfo(name, true, 0, time.Now(), false)
  147. return false
  148. }
  149. }
  150. for _, fileObject := range page.Contents {
  151. if fs.isEqual(fileObject.Key, name) {
  152. objectSize := *fileObject.Size
  153. objectModTime := *fileObject.LastModified
  154. isDir := strings.HasSuffix(*fileObject.Key, "/") && objectSize == 0
  155. result = NewFileInfo(name, isDir, objectSize, objectModTime, false)
  156. return false
  157. }
  158. }
  159. return true
  160. })
  161. metrics.S3ListObjectsCompleted(err)
  162. if err == nil && result.Name() == "" {
  163. err = errors.New("404 no such file or directory")
  164. }
  165. return result, err
  166. }
  167. // Lstat returns a FileInfo describing the named file
  168. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  169. return fs.Stat(name)
  170. }
  171. // Open opens the named file for reading
  172. func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  173. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  174. if err != nil {
  175. return nil, nil, nil, err
  176. }
  177. ctx, cancelFn := context.WithCancel(context.Background())
  178. downloader := s3manager.NewDownloaderWithClient(fs.svc)
  179. var streamRange *string
  180. if offset > 0 {
  181. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  182. }
  183. go func() {
  184. defer cancelFn()
  185. n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
  186. Bucket: aws.String(fs.config.Bucket),
  187. Key: aws.String(name),
  188. Range: streamRange,
  189. })
  190. w.CloseWithError(err) //nolint:errcheck
  191. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  192. metrics.S3TransferCompleted(n, 1, err)
  193. }()
  194. return nil, r, cancelFn, nil
  195. }
  196. // Create creates or opens the named file for writing
  197. func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  198. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  199. if err != nil {
  200. return nil, nil, nil, err
  201. }
  202. p := NewPipeWriter(w)
  203. ctx, cancelFn := context.WithCancel(context.Background())
  204. uploader := s3manager.NewUploaderWithClient(fs.svc)
  205. go func() {
  206. defer cancelFn()
  207. key := name
  208. var contentType string
  209. if flag == -1 {
  210. contentType = dirMimeType
  211. } else {
  212. contentType = mime.TypeByExtension(path.Ext(name))
  213. }
  214. response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
  215. Bucket: aws.String(fs.config.Bucket),
  216. Key: aws.String(key),
  217. Body: r,
  218. StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
  219. ContentType: utils.NilIfEmpty(contentType),
  220. }, func(u *s3manager.Uploader) {
  221. u.Concurrency = fs.config.UploadConcurrency
  222. u.PartSize = fs.config.UploadPartSize
  223. })
  224. r.CloseWithError(err) //nolint:errcheck
  225. p.Done(err)
  226. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, response: %v, readed bytes: %v, err: %+v",
  227. name, response, r.GetReadedBytes(), err)
  228. metrics.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  229. }()
  230. return nil, p, cancelFn, nil
  231. }
  232. // Rename renames (moves) source to target.
  233. // We don't support renaming non empty directories since we should
  234. // rename all the contents too and this could take long time: think
  235. // about directories with thousands of files, for each file we should
  236. // execute a CopyObject call.
  237. // TODO: rename does not work for files bigger than 5GB, implement
  238. // multipart copy or wait for this pull request to be merged:
  239. //
  240. // https://github.com/aws/aws-sdk-go/pull/2653
  241. //
  242. func (fs *S3Fs) Rename(source, target string) error {
  243. if source == target {
  244. return nil
  245. }
  246. fi, err := fs.Stat(source)
  247. if err != nil {
  248. return err
  249. }
  250. copySource := fs.Join(fs.config.Bucket, source)
  251. if fi.IsDir() {
  252. hasContents, err := fs.hasContents(source)
  253. if err != nil {
  254. return err
  255. }
  256. if hasContents {
  257. return fmt.Errorf("Cannot rename non empty directory: %#v", source)
  258. }
  259. if !strings.HasSuffix(copySource, "/") {
  260. copySource += "/"
  261. }
  262. if !strings.HasSuffix(target, "/") {
  263. target += "/"
  264. }
  265. }
  266. var contentType string
  267. if fi.IsDir() {
  268. contentType = dirMimeType
  269. } else {
  270. contentType = mime.TypeByExtension(path.Ext(source))
  271. }
  272. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  273. defer cancelFn()
  274. _, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
  275. Bucket: aws.String(fs.config.Bucket),
  276. CopySource: aws.String(copySource),
  277. Key: aws.String(target),
  278. StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
  279. ContentType: utils.NilIfEmpty(contentType),
  280. })
  281. metrics.S3CopyObjectCompleted(err)
  282. if err != nil {
  283. return err
  284. }
  285. return fs.Remove(source, fi.IsDir())
  286. }
  287. // Remove removes the named file or (empty) directory.
  288. func (fs *S3Fs) Remove(name string, isDir bool) error {
  289. if isDir {
  290. hasContents, err := fs.hasContents(name)
  291. if err != nil {
  292. return err
  293. }
  294. if hasContents {
  295. return fmt.Errorf("Cannot remove non empty directory: %#v", name)
  296. }
  297. if !strings.HasSuffix(name, "/") {
  298. name += "/"
  299. }
  300. }
  301. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  302. defer cancelFn()
  303. _, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
  304. Bucket: aws.String(fs.config.Bucket),
  305. Key: aws.String(name),
  306. })
  307. metrics.S3DeleteObjectCompleted(err)
  308. return err
  309. }
  310. // Mkdir creates a new directory with the specified name and default permissions
  311. func (fs *S3Fs) Mkdir(name string) error {
  312. _, err := fs.Stat(name)
  313. if !fs.IsNotExist(err) {
  314. return err
  315. }
  316. if !strings.HasSuffix(name, "/") {
  317. name += "/"
  318. }
  319. _, w, _, err := fs.Create(name, -1)
  320. if err != nil {
  321. return err
  322. }
  323. return w.Close()
  324. }
  325. // Symlink creates source as a symbolic link to target.
  326. func (*S3Fs) Symlink(source, target string) error {
  327. return ErrVfsUnsupported
  328. }
  329. // Readlink returns the destination of the named symbolic link
  330. func (*S3Fs) Readlink(name string) (string, error) {
  331. return "", ErrVfsUnsupported
  332. }
  333. // Chown changes the numeric uid and gid of the named file.
  334. func (*S3Fs) Chown(name string, uid int, gid int) error {
  335. return ErrVfsUnsupported
  336. }
  337. // Chmod changes the mode of the named file to mode.
  338. func (*S3Fs) Chmod(name string, mode os.FileMode) error {
  339. return ErrVfsUnsupported
  340. }
  341. // Chtimes changes the access and modification times of the named file.
  342. func (*S3Fs) Chtimes(name string, atime, mtime time.Time) error {
  343. return ErrVfsUnsupported
  344. }
  345. // Truncate changes the size of the named file.
  346. // Truncate by path is not supported, while truncating an opened
  347. // file is handled inside base transfer
  348. func (*S3Fs) Truncate(name string, size int64) error {
  349. return ErrVfsUnsupported
  350. }
  351. // ReadDir reads the directory named by dirname and returns
  352. // a list of directory entries.
  353. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  354. var result []os.FileInfo
  355. // dirname must be already cleaned
  356. prefix := ""
  357. if dirname != "/" && dirname != "." {
  358. prefix = strings.TrimPrefix(dirname, "/")
  359. if !strings.HasSuffix(prefix, "/") {
  360. prefix += "/"
  361. }
  362. }
  363. prefixes := make(map[string]bool)
  364. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  365. defer cancelFn()
  366. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  367. Bucket: aws.String(fs.config.Bucket),
  368. Prefix: aws.String(prefix),
  369. Delimiter: aws.String("/"),
  370. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  371. for _, p := range page.CommonPrefixes {
  372. // prefixes have a trailing slash
  373. name, _ := fs.resolve(p.Prefix, prefix)
  374. if name == "" {
  375. continue
  376. }
  377. if _, ok := prefixes[name]; ok {
  378. continue
  379. }
  380. result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
  381. prefixes[name] = true
  382. }
  383. for _, fileObject := range page.Contents {
  384. objectSize := *fileObject.Size
  385. objectModTime := *fileObject.LastModified
  386. name, isDir := fs.resolve(fileObject.Key, prefix)
  387. if name == "" {
  388. continue
  389. }
  390. if isDir {
  391. if _, ok := prefixes[name]; ok {
  392. continue
  393. }
  394. prefixes[name] = true
  395. }
  396. result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  397. }
  398. return true
  399. })
  400. metrics.S3ListObjectsCompleted(err)
  401. return result, err
  402. }
  403. // IsUploadResumeSupported returns true if upload resume is supported.
  404. // SFTP Resume is not supported on S3
  405. func (*S3Fs) IsUploadResumeSupported() bool {
  406. return false
  407. }
  408. // IsAtomicUploadSupported returns true if atomic upload is supported.
  409. // S3 uploads are already atomic, we don't need to upload to a temporary
  410. // file
  411. func (*S3Fs) IsAtomicUploadSupported() bool {
  412. return false
  413. }
  414. // IsNotExist returns a boolean indicating whether the error is known to
  415. // report that a file or directory does not exist
  416. func (*S3Fs) IsNotExist(err error) bool {
  417. if err == nil {
  418. return false
  419. }
  420. if aerr, ok := err.(awserr.Error); ok {
  421. if aerr.Code() == s3.ErrCodeNoSuchKey {
  422. return true
  423. }
  424. if aerr.Code() == s3.ErrCodeNoSuchBucket {
  425. return true
  426. }
  427. }
  428. if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
  429. if multierr.Code() == s3.ErrCodeNoSuchKey {
  430. return true
  431. }
  432. if multierr.Code() == s3.ErrCodeNoSuchBucket {
  433. return true
  434. }
  435. }
  436. return strings.Contains(err.Error(), "404")
  437. }
  438. // IsPermission returns a boolean indicating whether the error is known to
  439. // report that permission is denied.
  440. func (*S3Fs) IsPermission(err error) bool {
  441. if err == nil {
  442. return false
  443. }
  444. return strings.Contains(err.Error(), "403")
  445. }
  446. // IsNotSupported returns true if the error indicate an unsupported operation
  447. func (*S3Fs) IsNotSupported(err error) bool {
  448. if err == nil {
  449. return false
  450. }
  451. return err == ErrVfsUnsupported
  452. }
  453. // CheckRootPath creates the specified local root directory if it does not exists
  454. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  455. // we need a local directory for temporary files
  456. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
  457. return osFs.CheckRootPath(username, uid, gid)
  458. }
  459. // ScanRootDirContents returns the number of files contained in the bucket,
  460. // and their size
  461. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  462. numFiles := 0
  463. size := int64(0)
  464. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  465. defer cancelFn()
  466. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  467. Bucket: aws.String(fs.config.Bucket),
  468. Prefix: aws.String(fs.config.KeyPrefix),
  469. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  470. for _, fileObject := range page.Contents {
  471. isDir := strings.HasSuffix(*fileObject.Key, "/")
  472. if isDir && *fileObject.Size == 0 {
  473. continue
  474. }
  475. numFiles++
  476. size += *fileObject.Size
  477. }
  478. return true
  479. })
  480. metrics.S3ListObjectsCompleted(err)
  481. return numFiles, size, err
  482. }
  483. // GetDirSize returns the number of files and the size for a folder
  484. // including any subfolders
  485. func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
  486. return 0, 0, ErrVfsUnsupported
  487. }
  488. // GetAtomicUploadPath returns the path to use for an atomic upload.
  489. // S3 uploads are already atomic, we never call this method for S3
  490. func (*S3Fs) GetAtomicUploadPath(name string) string {
  491. return ""
  492. }
  493. // GetRelativePath returns the path for a file relative to the user's home dir.
  494. // This is the path as seen by SFTPGo users
  495. func (fs *S3Fs) GetRelativePath(name string) string {
  496. rel := path.Clean(name)
  497. if rel == "." {
  498. rel = ""
  499. }
  500. if !strings.HasPrefix(rel, "/") {
  501. return "/" + rel
  502. }
  503. if fs.config.KeyPrefix != "" {
  504. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  505. rel = "/"
  506. }
  507. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  508. }
  509. return rel
  510. }
  511. // Walk walks the file tree rooted at root, calling walkFn for each file or
  512. // directory in the tree, including root. The result are unordered
  513. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  514. prefix := ""
  515. if root != "/" && root != "." {
  516. prefix = strings.TrimPrefix(root, "/")
  517. if !strings.HasSuffix(prefix, "/") {
  518. prefix += "/"
  519. }
  520. }
  521. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  522. defer cancelFn()
  523. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  524. Bucket: aws.String(fs.config.Bucket),
  525. Prefix: aws.String(prefix),
  526. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  527. for _, fileObject := range page.Contents {
  528. objectSize := *fileObject.Size
  529. objectModTime := *fileObject.LastModified
  530. isDir := strings.HasSuffix(*fileObject.Key, "/")
  531. name := path.Clean(*fileObject.Key)
  532. if name == "/" || name == "." {
  533. continue
  534. }
  535. err := walkFn(fs.Join("/", *fileObject.Key), NewFileInfo(name, isDir, objectSize, objectModTime, false), nil)
  536. if err != nil {
  537. return false
  538. }
  539. }
  540. return true
  541. })
  542. metrics.S3ListObjectsCompleted(err)
  543. walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck
  544. return err
  545. }
  546. // Join joins any number of path elements into a single path
  547. func (*S3Fs) Join(elem ...string) string {
  548. return path.Join(elem...)
  549. }
  550. // HasVirtualFolders returns true if folders are emulated
  551. func (*S3Fs) HasVirtualFolders() bool {
  552. return true
  553. }
  554. // ResolvePath returns the matching filesystem path for the specified virtual path
  555. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  556. if !path.IsAbs(virtualPath) {
  557. virtualPath = path.Clean("/" + virtualPath)
  558. }
  559. return fs.Join("/", fs.config.KeyPrefix, virtualPath), nil
  560. }
  561. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  562. result := strings.TrimPrefix(*name, prefix)
  563. isDir := strings.HasSuffix(result, "/")
  564. if isDir {
  565. result = strings.TrimSuffix(result, "/")
  566. }
  567. if strings.Contains(result, "/") {
  568. i := strings.Index(result, "/")
  569. isDir = true
  570. result = result[:i]
  571. }
  572. return result, isDir
  573. }
  574. func (fs *S3Fs) isEqual(s3Key *string, virtualName string) bool {
  575. if *s3Key == virtualName {
  576. return true
  577. }
  578. if "/"+*s3Key == virtualName {
  579. return true
  580. }
  581. if "/"+*s3Key == virtualName+"/" {
  582. return true
  583. }
  584. return false
  585. }
  586. func (fs *S3Fs) checkIfBucketExists() error {
  587. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  588. defer cancelFn()
  589. _, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
  590. Bucket: aws.String(fs.config.Bucket),
  591. })
  592. metrics.S3HeadBucketCompleted(err)
  593. return err
  594. }
  595. func (fs *S3Fs) hasContents(name string) (bool, error) {
  596. prefix := ""
  597. if name != "/" && name != "." {
  598. prefix = strings.TrimPrefix(name, "/")
  599. if !strings.HasSuffix(prefix, "/") {
  600. prefix += "/"
  601. }
  602. }
  603. maxResults := int64(2)
  604. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  605. defer cancelFn()
  606. results, err := fs.svc.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
  607. Bucket: aws.String(fs.config.Bucket),
  608. Prefix: aws.String(prefix),
  609. MaxKeys: &maxResults,
  610. })
  611. metrics.S3ListObjectsCompleted(err)
  612. if err != nil {
  613. return false, err
  614. }
  615. // MinIO returns no contents while S3 returns 1 object
  616. // with the key equal to the prefix for empty directories
  617. for _, obj := range results.Contents {
  618. name, _ := fs.resolve(obj.Key, prefix)
  619. if name == "" || name == "/" {
  620. continue
  621. }
  622. return true, nil
  623. }
  624. return false, nil
  625. }
  626. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  627. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  628. defer cancelFn()
  629. obj, err := fs.svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
  630. Bucket: aws.String(fs.config.Bucket),
  631. Key: aws.String(name),
  632. })
  633. metrics.S3HeadObjectCompleted(err)
  634. return obj, err
  635. }
  636. // GetMimeType returns the content type
  637. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  638. obj, err := fs.headObject(name)
  639. if err != nil {
  640. return "", err
  641. }
  642. return *obj.ContentType, err
  643. }