s3fs.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685
  1. // +build !nos3
  2. package vfs
  3. import (
  4. "context"
  5. "fmt"
  6. "mime"
  7. "net/url"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/credentials"
  16. "github.com/aws/aws-sdk-go/aws/session"
  17. "github.com/aws/aws-sdk-go/service/s3"
  18. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  19. "github.com/eikenb/pipeat"
  20. "github.com/pkg/sftp"
  21. "github.com/drakkan/sftpgo/logger"
  22. "github.com/drakkan/sftpgo/metrics"
  23. "github.com/drakkan/sftpgo/utils"
  24. "github.com/drakkan/sftpgo/version"
  25. )
  26. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  27. type S3Fs struct {
  28. connectionID string
  29. localTempDir string
  30. // if not empty this fs is mouted as virtual folder in the specified path
  31. mountPath string
  32. config *S3FsConfig
  33. svc *s3.S3
  34. ctxTimeout time.Duration
  35. ctxLongTimeout time.Duration
  36. }
  37. func init() {
  38. version.AddFeature("+s3")
  39. }
  40. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  41. // object storage
  42. func NewS3Fs(connectionID, localTempDir, mountPath string, config S3FsConfig) (Fs, error) {
  43. if localTempDir == "" {
  44. if tempPath != "" {
  45. localTempDir = tempPath
  46. } else {
  47. localTempDir = filepath.Clean(os.TempDir())
  48. }
  49. }
  50. fs := &S3Fs{
  51. connectionID: connectionID,
  52. localTempDir: localTempDir,
  53. mountPath: mountPath,
  54. config: &config,
  55. ctxTimeout: 30 * time.Second,
  56. ctxLongTimeout: 300 * time.Second,
  57. }
  58. if err := fs.config.Validate(); err != nil {
  59. return fs, err
  60. }
  61. awsConfig := aws.NewConfig()
  62. if fs.config.Region != "" {
  63. awsConfig.WithRegion(fs.config.Region)
  64. }
  65. if !fs.config.AccessSecret.IsEmpty() {
  66. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  67. return fs, err
  68. }
  69. awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), "")
  70. }
  71. if fs.config.Endpoint != "" {
  72. awsConfig.Endpoint = aws.String(fs.config.Endpoint)
  73. awsConfig.S3ForcePathStyle = aws.Bool(true)
  74. }
  75. if fs.config.UploadPartSize == 0 {
  76. fs.config.UploadPartSize = s3manager.DefaultUploadPartSize
  77. } else {
  78. fs.config.UploadPartSize *= 1024 * 1024
  79. }
  80. if fs.config.UploadConcurrency == 0 {
  81. fs.config.UploadConcurrency = 2
  82. }
  83. sessOpts := session.Options{
  84. Config: *awsConfig,
  85. SharedConfigState: session.SharedConfigEnable,
  86. }
  87. sess, err := session.NewSessionWithOptions(sessOpts)
  88. if err != nil {
  89. return fs, err
  90. }
  91. fs.svc = s3.New(sess)
  92. return fs, nil
  93. }
  94. // Name returns the name for the Fs implementation
  95. func (fs *S3Fs) Name() string {
  96. return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket)
  97. }
  98. // ConnectionID returns the connection ID associated to this Fs implementation
  99. func (fs *S3Fs) ConnectionID() string {
  100. return fs.connectionID
  101. }
  102. // Stat returns a FileInfo describing the named file
  103. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  104. var result *FileInfo
  105. if name == "/" || name == "." {
  106. err := fs.checkIfBucketExists()
  107. if err != nil {
  108. return result, err
  109. }
  110. return NewFileInfo(name, true, 0, time.Now(), false), nil
  111. }
  112. if "/"+fs.config.KeyPrefix == name+"/" {
  113. return NewFileInfo(name, true, 0, time.Now(), false), nil
  114. }
  115. obj, err := fs.headObject(name)
  116. if err == nil {
  117. // a "dir" has a trailing "/" so we cannot have a directory here
  118. objSize := *obj.ContentLength
  119. objectModTime := *obj.LastModified
  120. return NewFileInfo(name, false, objSize, objectModTime, false), nil
  121. }
  122. if !fs.IsNotExist(err) {
  123. return result, err
  124. }
  125. // now check if this is a prefix (virtual directory)
  126. hasContents, err := fs.hasContents(name)
  127. if err == nil && hasContents {
  128. return NewFileInfo(name, true, 0, time.Now(), false), nil
  129. } else if err != nil {
  130. return nil, err
  131. }
  132. // the requested file may still be a directory as a zero bytes key
  133. // with a trailing forward slash (created using mkdir).
  134. // S3 doesn't return content type when listing objects, so we have
  135. // create "dirs" adding a trailing "/" to the key
  136. return fs.getStatForDir(name)
  137. }
  138. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  139. var result *FileInfo
  140. obj, err := fs.headObject(name + "/")
  141. if err != nil {
  142. return result, err
  143. }
  144. objSize := *obj.ContentLength
  145. objectModTime := *obj.LastModified
  146. return NewFileInfo(name, true, objSize, objectModTime, false), nil
  147. }
  148. // Lstat returns a FileInfo describing the named file
  149. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  150. return fs.Stat(name)
  151. }
  152. // Open opens the named file for reading
  153. func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  154. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  155. if err != nil {
  156. return nil, nil, nil, err
  157. }
  158. ctx, cancelFn := context.WithCancel(context.Background())
  159. downloader := s3manager.NewDownloaderWithClient(fs.svc)
  160. var streamRange *string
  161. if offset > 0 {
  162. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  163. }
  164. go func() {
  165. defer cancelFn()
  166. n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
  167. Bucket: aws.String(fs.config.Bucket),
  168. Key: aws.String(name),
  169. Range: streamRange,
  170. })
  171. w.CloseWithError(err) //nolint:errcheck
  172. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  173. metrics.S3TransferCompleted(n, 1, err)
  174. }()
  175. return nil, r, cancelFn, nil
  176. }
  177. // Create creates or opens the named file for writing
  178. func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  179. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  180. if err != nil {
  181. return nil, nil, nil, err
  182. }
  183. p := NewPipeWriter(w)
  184. ctx, cancelFn := context.WithCancel(context.Background())
  185. uploader := s3manager.NewUploaderWithClient(fs.svc)
  186. go func() {
  187. defer cancelFn()
  188. key := name
  189. var contentType string
  190. if flag == -1 {
  191. contentType = dirMimeType
  192. } else {
  193. contentType = mime.TypeByExtension(path.Ext(name))
  194. }
  195. response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
  196. Bucket: aws.String(fs.config.Bucket),
  197. Key: aws.String(key),
  198. Body: r,
  199. StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
  200. ContentType: utils.NilIfEmpty(contentType),
  201. }, func(u *s3manager.Uploader) {
  202. u.Concurrency = fs.config.UploadConcurrency
  203. u.PartSize = fs.config.UploadPartSize
  204. })
  205. r.CloseWithError(err) //nolint:errcheck
  206. p.Done(err)
  207. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, response: %v, readed bytes: %v, err: %+v",
  208. name, response, r.GetReadedBytes(), err)
  209. metrics.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  210. }()
  211. return nil, p, cancelFn, nil
  212. }
  213. // Rename renames (moves) source to target.
  214. // We don't support renaming non empty directories since we should
  215. // rename all the contents too and this could take long time: think
  216. // about directories with thousands of files, for each file we should
  217. // execute a CopyObject call.
  218. // TODO: rename does not work for files bigger than 5GB, implement
  219. // multipart copy or wait for this pull request to be merged:
  220. //
  221. // https://github.com/aws/aws-sdk-go/pull/2653
  222. //
  223. func (fs *S3Fs) Rename(source, target string) error {
  224. if source == target {
  225. return nil
  226. }
  227. fi, err := fs.Stat(source)
  228. if err != nil {
  229. return err
  230. }
  231. copySource := fs.Join(fs.config.Bucket, source)
  232. if fi.IsDir() {
  233. hasContents, err := fs.hasContents(source)
  234. if err != nil {
  235. return err
  236. }
  237. if hasContents {
  238. return fmt.Errorf("cannot rename non empty directory: %#v", source)
  239. }
  240. if !strings.HasSuffix(copySource, "/") {
  241. copySource += "/"
  242. }
  243. if !strings.HasSuffix(target, "/") {
  244. target += "/"
  245. }
  246. }
  247. var contentType string
  248. if fi.IsDir() {
  249. contentType = dirMimeType
  250. } else {
  251. contentType = mime.TypeByExtension(path.Ext(source))
  252. }
  253. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  254. defer cancelFn()
  255. _, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
  256. Bucket: aws.String(fs.config.Bucket),
  257. CopySource: aws.String(url.PathEscape(copySource)),
  258. Key: aws.String(target),
  259. StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
  260. ContentType: utils.NilIfEmpty(contentType),
  261. })
  262. metrics.S3CopyObjectCompleted(err)
  263. if err != nil {
  264. return err
  265. }
  266. return fs.Remove(source, fi.IsDir())
  267. }
  268. // Remove removes the named file or (empty) directory.
  269. func (fs *S3Fs) Remove(name string, isDir bool) error {
  270. if isDir {
  271. hasContents, err := fs.hasContents(name)
  272. if err != nil {
  273. return err
  274. }
  275. if hasContents {
  276. return fmt.Errorf("cannot remove non empty directory: %#v", name)
  277. }
  278. if !strings.HasSuffix(name, "/") {
  279. name += "/"
  280. }
  281. }
  282. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  283. defer cancelFn()
  284. _, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
  285. Bucket: aws.String(fs.config.Bucket),
  286. Key: aws.String(name),
  287. })
  288. metrics.S3DeleteObjectCompleted(err)
  289. return err
  290. }
  291. // Mkdir creates a new directory with the specified name and default permissions
  292. func (fs *S3Fs) Mkdir(name string) error {
  293. _, err := fs.Stat(name)
  294. if !fs.IsNotExist(err) {
  295. return err
  296. }
  297. if !strings.HasSuffix(name, "/") {
  298. name += "/"
  299. }
  300. _, w, _, err := fs.Create(name, -1)
  301. if err != nil {
  302. return err
  303. }
  304. return w.Close()
  305. }
  306. // MkdirAll does nothing, we don't have folder
  307. func (*S3Fs) MkdirAll(name string, uid int, gid int) error {
  308. return nil
  309. }
  310. // Symlink creates source as a symbolic link to target.
  311. func (*S3Fs) Symlink(source, target string) error {
  312. return ErrVfsUnsupported
  313. }
  314. // Readlink returns the destination of the named symbolic link
  315. func (*S3Fs) Readlink(name string) (string, error) {
  316. return "", ErrVfsUnsupported
  317. }
  318. // Chown changes the numeric uid and gid of the named file.
  319. func (*S3Fs) Chown(name string, uid int, gid int) error {
  320. return ErrVfsUnsupported
  321. }
  322. // Chmod changes the mode of the named file to mode.
  323. func (*S3Fs) Chmod(name string, mode os.FileMode) error {
  324. return ErrVfsUnsupported
  325. }
  326. // Chtimes changes the access and modification times of the named file.
  327. func (*S3Fs) Chtimes(name string, atime, mtime time.Time) error {
  328. return ErrVfsUnsupported
  329. }
  330. // Truncate changes the size of the named file.
  331. // Truncate by path is not supported, while truncating an opened
  332. // file is handled inside base transfer
  333. func (*S3Fs) Truncate(name string, size int64) error {
  334. return ErrVfsUnsupported
  335. }
  336. // ReadDir reads the directory named by dirname and returns
  337. // a list of directory entries.
  338. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  339. var result []os.FileInfo
  340. // dirname must be already cleaned
  341. prefix := ""
  342. if dirname != "/" && dirname != "." {
  343. prefix = strings.TrimPrefix(dirname, "/")
  344. if !strings.HasSuffix(prefix, "/") {
  345. prefix += "/"
  346. }
  347. }
  348. prefixes := make(map[string]bool)
  349. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  350. defer cancelFn()
  351. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  352. Bucket: aws.String(fs.config.Bucket),
  353. Prefix: aws.String(prefix),
  354. Delimiter: aws.String("/"),
  355. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  356. for _, p := range page.CommonPrefixes {
  357. // prefixes have a trailing slash
  358. name, _ := fs.resolve(p.Prefix, prefix)
  359. if name == "" {
  360. continue
  361. }
  362. if _, ok := prefixes[name]; ok {
  363. continue
  364. }
  365. result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
  366. prefixes[name] = true
  367. }
  368. for _, fileObject := range page.Contents {
  369. objectSize := *fileObject.Size
  370. objectModTime := *fileObject.LastModified
  371. name, isDir := fs.resolve(fileObject.Key, prefix)
  372. if name == "" {
  373. continue
  374. }
  375. if isDir {
  376. if _, ok := prefixes[name]; ok {
  377. continue
  378. }
  379. prefixes[name] = true
  380. }
  381. result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  382. }
  383. return true
  384. })
  385. metrics.S3ListObjectsCompleted(err)
  386. return result, err
  387. }
  388. // IsUploadResumeSupported returns true if resuming uploads is supported.
  389. // Resuming uploads is not supported on S3
  390. func (*S3Fs) IsUploadResumeSupported() bool {
  391. return false
  392. }
  393. // IsAtomicUploadSupported returns true if atomic upload is supported.
  394. // S3 uploads are already atomic, we don't need to upload to a temporary
  395. // file
  396. func (*S3Fs) IsAtomicUploadSupported() bool {
  397. return false
  398. }
  399. // IsNotExist returns a boolean indicating whether the error is known to
  400. // report that a file or directory does not exist
  401. func (*S3Fs) IsNotExist(err error) bool {
  402. if err == nil {
  403. return false
  404. }
  405. if aerr, ok := err.(awserr.Error); ok {
  406. if aerr.Code() == s3.ErrCodeNoSuchKey {
  407. return true
  408. }
  409. if aerr.Code() == s3.ErrCodeNoSuchBucket {
  410. return true
  411. }
  412. }
  413. if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
  414. if multierr.Code() == s3.ErrCodeNoSuchKey {
  415. return true
  416. }
  417. if multierr.Code() == s3.ErrCodeNoSuchBucket {
  418. return true
  419. }
  420. }
  421. return strings.Contains(err.Error(), "404")
  422. }
  423. // IsPermission returns a boolean indicating whether the error is known to
  424. // report that permission is denied.
  425. func (*S3Fs) IsPermission(err error) bool {
  426. if err == nil {
  427. return false
  428. }
  429. return strings.Contains(err.Error(), "403")
  430. }
  431. // IsNotSupported returns true if the error indicate an unsupported operation
  432. func (*S3Fs) IsNotSupported(err error) bool {
  433. if err == nil {
  434. return false
  435. }
  436. return err == ErrVfsUnsupported
  437. }
  438. // CheckRootPath creates the specified local root directory if it does not exists
  439. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  440. // we need a local directory for temporary files
  441. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
  442. return osFs.CheckRootPath(username, uid, gid)
  443. }
  444. // ScanRootDirContents returns the number of files contained in the bucket,
  445. // and their size
  446. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  447. numFiles := 0
  448. size := int64(0)
  449. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  450. defer cancelFn()
  451. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  452. Bucket: aws.String(fs.config.Bucket),
  453. Prefix: aws.String(fs.config.KeyPrefix),
  454. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  455. for _, fileObject := range page.Contents {
  456. isDir := strings.HasSuffix(*fileObject.Key, "/")
  457. if isDir && *fileObject.Size == 0 {
  458. continue
  459. }
  460. numFiles++
  461. size += *fileObject.Size
  462. }
  463. return true
  464. })
  465. metrics.S3ListObjectsCompleted(err)
  466. return numFiles, size, err
  467. }
  468. // GetDirSize returns the number of files and the size for a folder
  469. // including any subfolders
  470. func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
  471. return 0, 0, ErrVfsUnsupported
  472. }
  473. // GetAtomicUploadPath returns the path to use for an atomic upload.
  474. // S3 uploads are already atomic, we never call this method for S3
  475. func (*S3Fs) GetAtomicUploadPath(name string) string {
  476. return ""
  477. }
  478. // GetRelativePath returns the path for a file relative to the user's home dir.
  479. // This is the path as seen by SFTPGo users
  480. func (fs *S3Fs) GetRelativePath(name string) string {
  481. rel := path.Clean(name)
  482. if rel == "." {
  483. rel = ""
  484. }
  485. if !path.IsAbs(rel) {
  486. return "/" + rel
  487. }
  488. if fs.config.KeyPrefix != "" {
  489. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  490. rel = "/"
  491. }
  492. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  493. }
  494. if fs.mountPath != "" {
  495. rel = path.Join(fs.mountPath, rel)
  496. }
  497. return rel
  498. }
  499. // Walk walks the file tree rooted at root, calling walkFn for each file or
  500. // directory in the tree, including root. The result are unordered
  501. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  502. prefix := ""
  503. if root != "/" && root != "." {
  504. prefix = strings.TrimPrefix(root, "/")
  505. if !strings.HasSuffix(prefix, "/") {
  506. prefix += "/"
  507. }
  508. }
  509. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  510. defer cancelFn()
  511. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  512. Bucket: aws.String(fs.config.Bucket),
  513. Prefix: aws.String(prefix),
  514. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  515. for _, fileObject := range page.Contents {
  516. objectSize := *fileObject.Size
  517. objectModTime := *fileObject.LastModified
  518. isDir := strings.HasSuffix(*fileObject.Key, "/")
  519. name := path.Clean(*fileObject.Key)
  520. if name == "/" || name == "." {
  521. continue
  522. }
  523. err := walkFn(fs.Join("/", *fileObject.Key), NewFileInfo(name, isDir, objectSize, objectModTime, false), nil)
  524. if err != nil {
  525. return false
  526. }
  527. }
  528. return true
  529. })
  530. metrics.S3ListObjectsCompleted(err)
  531. walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck
  532. return err
  533. }
  534. // Join joins any number of path elements into a single path
  535. func (*S3Fs) Join(elem ...string) string {
  536. return path.Join(elem...)
  537. }
  538. // HasVirtualFolders returns true if folders are emulated
  539. func (*S3Fs) HasVirtualFolders() bool {
  540. return true
  541. }
  542. // ResolvePath returns the matching filesystem path for the specified virtual path
  543. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  544. if fs.mountPath != "" {
  545. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  546. }
  547. if !path.IsAbs(virtualPath) {
  548. virtualPath = path.Clean("/" + virtualPath)
  549. }
  550. return fs.Join("/", fs.config.KeyPrefix, virtualPath), nil
  551. }
  552. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  553. result := strings.TrimPrefix(*name, prefix)
  554. isDir := strings.HasSuffix(result, "/")
  555. if isDir {
  556. result = strings.TrimSuffix(result, "/")
  557. }
  558. if strings.Contains(result, "/") {
  559. i := strings.Index(result, "/")
  560. isDir = true
  561. result = result[:i]
  562. }
  563. return result, isDir
  564. }
  565. func (fs *S3Fs) checkIfBucketExists() error {
  566. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  567. defer cancelFn()
  568. _, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
  569. Bucket: aws.String(fs.config.Bucket),
  570. })
  571. metrics.S3HeadBucketCompleted(err)
  572. return err
  573. }
  574. func (fs *S3Fs) hasContents(name string) (bool, error) {
  575. prefix := ""
  576. if name != "/" && name != "." {
  577. prefix = strings.TrimPrefix(name, "/")
  578. if !strings.HasSuffix(prefix, "/") {
  579. prefix += "/"
  580. }
  581. }
  582. maxResults := int64(2)
  583. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  584. defer cancelFn()
  585. results, err := fs.svc.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
  586. Bucket: aws.String(fs.config.Bucket),
  587. Prefix: aws.String(prefix),
  588. MaxKeys: &maxResults,
  589. })
  590. metrics.S3ListObjectsCompleted(err)
  591. if err != nil {
  592. return false, err
  593. }
  594. // MinIO returns no contents while S3 returns 1 object
  595. // with the key equal to the prefix for empty directories
  596. for _, obj := range results.Contents {
  597. name, _ := fs.resolve(obj.Key, prefix)
  598. if name == "" || name == "/" {
  599. continue
  600. }
  601. return true, nil
  602. }
  603. return false, nil
  604. }
  605. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  606. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  607. defer cancelFn()
  608. obj, err := fs.svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
  609. Bucket: aws.String(fs.config.Bucket),
  610. Key: aws.String(name),
  611. })
  612. metrics.S3HeadObjectCompleted(err)
  613. return obj, err
  614. }
  615. // GetMimeType returns the content type
  616. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  617. obj, err := fs.headObject(name)
  618. if err != nil {
  619. return "", err
  620. }
  621. return *obj.ContentType, err
  622. }
  623. // Close closes the fs
  624. func (*S3Fs) Close() error {
  625. return nil
  626. }
  627. // GetAvailableDiskSize return the available size for the specified path
  628. func (*S3Fs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  629. return nil, ErrStorageSizeUnavailable
  630. }