s3fs.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. //go:build !nos3
  2. // +build !nos3
  3. package vfs
  4. import (
  5. "context"
  6. "fmt"
  7. "mime"
  8. "net/url"
  9. "os"
  10. "path"
  11. "path/filepath"
  12. "strings"
  13. "time"
  14. "github.com/aws/aws-sdk-go/aws"
  15. "github.com/aws/aws-sdk-go/aws/awserr"
  16. "github.com/aws/aws-sdk-go/aws/credentials"
  17. "github.com/aws/aws-sdk-go/aws/request"
  18. "github.com/aws/aws-sdk-go/aws/session"
  19. "github.com/aws/aws-sdk-go/service/s3"
  20. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  21. "github.com/eikenb/pipeat"
  22. "github.com/pkg/sftp"
  23. "github.com/drakkan/sftpgo/v2/logger"
  24. "github.com/drakkan/sftpgo/v2/metric"
  25. "github.com/drakkan/sftpgo/v2/plugin"
  26. "github.com/drakkan/sftpgo/v2/util"
  27. "github.com/drakkan/sftpgo/v2/version"
  28. )
  29. // using this mime type for directories improves compatibility with s3fs-fuse
  30. const s3DirMimeType = "application/x-directory"
  31. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  32. type S3Fs struct {
  33. connectionID string
  34. localTempDir string
  35. // if not empty this fs is mouted as virtual folder in the specified path
  36. mountPath string
  37. config *S3FsConfig
  38. svc *s3.S3
  39. ctxTimeout time.Duration
  40. ctxLongTimeout time.Duration
  41. }
  42. func init() {
  43. version.AddFeature("+s3")
  44. }
  45. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  46. // object storage
  47. func NewS3Fs(connectionID, localTempDir, mountPath string, config S3FsConfig) (Fs, error) {
  48. if localTempDir == "" {
  49. if tempPath != "" {
  50. localTempDir = tempPath
  51. } else {
  52. localTempDir = filepath.Clean(os.TempDir())
  53. }
  54. }
  55. fs := &S3Fs{
  56. connectionID: connectionID,
  57. localTempDir: localTempDir,
  58. mountPath: mountPath,
  59. config: &config,
  60. ctxTimeout: 30 * time.Second,
  61. ctxLongTimeout: 300 * time.Second,
  62. }
  63. if err := fs.config.Validate(); err != nil {
  64. return fs, err
  65. }
  66. awsConfig := aws.NewConfig()
  67. if fs.config.Region != "" {
  68. awsConfig.WithRegion(fs.config.Region)
  69. }
  70. if !fs.config.AccessSecret.IsEmpty() {
  71. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  72. return fs, err
  73. }
  74. awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), "")
  75. }
  76. if fs.config.Endpoint != "" {
  77. awsConfig.Endpoint = aws.String(fs.config.Endpoint)
  78. }
  79. if fs.config.ForcePathStyle {
  80. awsConfig.S3ForcePathStyle = aws.Bool(true)
  81. }
  82. if fs.config.UploadPartSize == 0 {
  83. fs.config.UploadPartSize = s3manager.DefaultUploadPartSize
  84. } else {
  85. fs.config.UploadPartSize *= 1024 * 1024
  86. }
  87. if fs.config.UploadConcurrency == 0 {
  88. fs.config.UploadConcurrency = s3manager.DefaultUploadConcurrency
  89. }
  90. if fs.config.DownloadPartSize == 0 {
  91. fs.config.DownloadPartSize = s3manager.DefaultDownloadPartSize
  92. } else {
  93. fs.config.DownloadPartSize *= 1024 * 1024
  94. }
  95. if fs.config.DownloadConcurrency == 0 {
  96. fs.config.DownloadConcurrency = s3manager.DefaultDownloadConcurrency
  97. }
  98. sessOpts := session.Options{
  99. Config: *awsConfig,
  100. SharedConfigState: session.SharedConfigEnable,
  101. }
  102. sess, err := session.NewSessionWithOptions(sessOpts)
  103. if err != nil {
  104. return fs, err
  105. }
  106. fs.svc = s3.New(sess)
  107. return fs, nil
  108. }
  109. // Name returns the name for the Fs implementation
  110. func (fs *S3Fs) Name() string {
  111. return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket)
  112. }
  113. // ConnectionID returns the connection ID associated to this Fs implementation
  114. func (fs *S3Fs) ConnectionID() string {
  115. return fs.connectionID
  116. }
  117. // Stat returns a FileInfo describing the named file
  118. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  119. var result *FileInfo
  120. if name == "/" || name == "." {
  121. err := fs.checkIfBucketExists()
  122. if err != nil {
  123. return result, err
  124. }
  125. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
  126. }
  127. if "/"+fs.config.KeyPrefix == name+"/" {
  128. return NewFileInfo(name, true, 0, time.Now(), false), nil
  129. }
  130. obj, err := fs.headObject(name)
  131. if err == nil {
  132. // a "dir" has a trailing "/" so we cannot have a directory here
  133. objSize := *obj.ContentLength
  134. objectModTime := *obj.LastModified
  135. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, false, objSize, objectModTime, false))
  136. }
  137. if !fs.IsNotExist(err) {
  138. return result, err
  139. }
  140. // now check if this is a prefix (virtual directory)
  141. hasContents, err := fs.hasContents(name)
  142. if err == nil && hasContents {
  143. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
  144. } else if err != nil {
  145. return nil, err
  146. }
  147. // the requested file may still be a directory as a zero bytes key
  148. // with a trailing forward slash (created using mkdir).
  149. // S3 doesn't return content type when listing objects, so we have
  150. // create "dirs" adding a trailing "/" to the key
  151. return fs.getStatForDir(name)
  152. }
  153. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  154. var result *FileInfo
  155. obj, err := fs.headObject(name + "/")
  156. if err != nil {
  157. return result, err
  158. }
  159. objSize := *obj.ContentLength
  160. objectModTime := *obj.LastModified
  161. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, objSize, objectModTime, false))
  162. }
  163. // Lstat returns a FileInfo describing the named file
  164. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  165. return fs.Stat(name)
  166. }
  167. // Open opens the named file for reading
  168. func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  169. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  170. if err != nil {
  171. return nil, nil, nil, err
  172. }
  173. ctx, cancelFn := context.WithCancel(context.Background())
  174. downloader := s3manager.NewDownloaderWithClient(fs.svc)
  175. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  176. downloader.RequestOptions = append(downloader.RequestOptions, func(r *request.Request) {
  177. chunkCtx, cancel := context.WithTimeout(r.Context(), time.Duration(fs.config.DownloadPartMaxTime)*time.Second)
  178. r.SetContext(chunkCtx)
  179. go func() {
  180. <-ctx.Done()
  181. cancel()
  182. }()
  183. })
  184. }
  185. var streamRange *string
  186. if offset > 0 {
  187. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  188. }
  189. go func() {
  190. defer cancelFn()
  191. n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
  192. Bucket: aws.String(fs.config.Bucket),
  193. Key: aws.String(name),
  194. Range: streamRange,
  195. }, func(d *s3manager.Downloader) {
  196. d.Concurrency = fs.config.DownloadConcurrency
  197. d.PartSize = fs.config.DownloadPartSize
  198. })
  199. w.CloseWithError(err) //nolint:errcheck
  200. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  201. metric.S3TransferCompleted(n, 1, err)
  202. }()
  203. return nil, r, cancelFn, nil
  204. }
  205. // Create creates or opens the named file for writing
  206. func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  207. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  208. if err != nil {
  209. return nil, nil, nil, err
  210. }
  211. p := NewPipeWriter(w)
  212. ctx, cancelFn := context.WithCancel(context.Background())
  213. uploader := s3manager.NewUploaderWithClient(fs.svc)
  214. if fs.config.UploadPartMaxTime > 0 {
  215. uploader.RequestOptions = append(uploader.RequestOptions, func(r *request.Request) {
  216. chunkCtx, cancel := context.WithTimeout(r.Context(), time.Duration(fs.config.UploadPartMaxTime)*time.Second)
  217. r.SetContext(chunkCtx)
  218. go func() {
  219. <-ctx.Done()
  220. cancel()
  221. }()
  222. })
  223. }
  224. go func() {
  225. defer cancelFn()
  226. key := name
  227. var contentType string
  228. if flag == -1 {
  229. contentType = s3DirMimeType
  230. } else {
  231. contentType = mime.TypeByExtension(path.Ext(name))
  232. }
  233. response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
  234. Bucket: aws.String(fs.config.Bucket),
  235. Key: aws.String(key),
  236. Body: r,
  237. ACL: util.NilIfEmpty(fs.config.ACL),
  238. StorageClass: util.NilIfEmpty(fs.config.StorageClass),
  239. ContentType: util.NilIfEmpty(contentType),
  240. }, func(u *s3manager.Uploader) {
  241. u.Concurrency = fs.config.UploadConcurrency
  242. u.PartSize = fs.config.UploadPartSize
  243. })
  244. r.CloseWithError(err) //nolint:errcheck
  245. p.Done(err)
  246. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, acl: %#v, response: %v, readed bytes: %v, err: %+v",
  247. name, fs.config.ACL, response, r.GetReadedBytes(), err)
  248. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  249. }()
  250. return nil, p, cancelFn, nil
  251. }
  252. // Rename renames (moves) source to target.
  253. // We don't support renaming non empty directories since we should
  254. // rename all the contents too and this could take long time: think
  255. // about directories with thousands of files, for each file we should
  256. // execute a CopyObject call.
  257. // TODO: rename does not work for files bigger than 5GB, implement
  258. // multipart copy or wait for this pull request to be merged:
  259. //
  260. // https://github.com/aws/aws-sdk-go/pull/2653
  261. //
  262. func (fs *S3Fs) Rename(source, target string) error {
  263. if source == target {
  264. return nil
  265. }
  266. fi, err := fs.Stat(source)
  267. if err != nil {
  268. return err
  269. }
  270. copySource := fs.Join(fs.config.Bucket, source)
  271. if fi.IsDir() {
  272. hasContents, err := fs.hasContents(source)
  273. if err != nil {
  274. return err
  275. }
  276. if hasContents {
  277. return fmt.Errorf("cannot rename non empty directory: %#v", source)
  278. }
  279. if !strings.HasSuffix(copySource, "/") {
  280. copySource += "/"
  281. }
  282. if !strings.HasSuffix(target, "/") {
  283. target += "/"
  284. }
  285. }
  286. var contentType string
  287. if fi.IsDir() {
  288. contentType = s3DirMimeType
  289. } else {
  290. contentType = mime.TypeByExtension(path.Ext(source))
  291. }
  292. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  293. defer cancelFn()
  294. _, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
  295. Bucket: aws.String(fs.config.Bucket),
  296. CopySource: aws.String(pathEscape(copySource)),
  297. Key: aws.String(target),
  298. StorageClass: util.NilIfEmpty(fs.config.StorageClass),
  299. ACL: util.NilIfEmpty(fs.config.ACL),
  300. ContentType: util.NilIfEmpty(contentType),
  301. })
  302. if err != nil {
  303. metric.S3CopyObjectCompleted(err)
  304. return err
  305. }
  306. err = fs.svc.WaitUntilObjectExistsWithContext(ctx, &s3.HeadObjectInput{
  307. Bucket: aws.String(fs.config.Bucket),
  308. Key: aws.String(target),
  309. })
  310. metric.S3CopyObjectCompleted(err)
  311. if err != nil {
  312. return err
  313. }
  314. if plugin.Handler.HasMetadater() {
  315. if !fi.IsDir() {
  316. err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
  317. util.GetTimeAsMsSinceEpoch(fi.ModTime()))
  318. if err != nil {
  319. fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %v",
  320. source, target, err)
  321. }
  322. }
  323. }
  324. return fs.Remove(source, fi.IsDir())
  325. }
  326. // Remove removes the named file or (empty) directory.
  327. func (fs *S3Fs) Remove(name string, isDir bool) error {
  328. if isDir {
  329. hasContents, err := fs.hasContents(name)
  330. if err != nil {
  331. return err
  332. }
  333. if hasContents {
  334. return fmt.Errorf("cannot remove non empty directory: %#v", name)
  335. }
  336. if !strings.HasSuffix(name, "/") {
  337. name += "/"
  338. }
  339. }
  340. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  341. defer cancelFn()
  342. _, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
  343. Bucket: aws.String(fs.config.Bucket),
  344. Key: aws.String(name),
  345. })
  346. metric.S3DeleteObjectCompleted(err)
  347. if plugin.Handler.HasMetadater() && err == nil && !isDir {
  348. if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
  349. fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %#v: %v", name, errMetadata)
  350. }
  351. }
  352. return err
  353. }
  354. // Mkdir creates a new directory with the specified name and default permissions
  355. func (fs *S3Fs) Mkdir(name string) error {
  356. _, err := fs.Stat(name)
  357. if !fs.IsNotExist(err) {
  358. return err
  359. }
  360. if !strings.HasSuffix(name, "/") {
  361. name += "/"
  362. }
  363. _, w, _, err := fs.Create(name, -1)
  364. if err != nil {
  365. return err
  366. }
  367. return w.Close()
  368. }
  369. // MkdirAll does nothing, we don't have folder
  370. func (*S3Fs) MkdirAll(name string, uid int, gid int) error {
  371. return nil
  372. }
  373. // Symlink creates source as a symbolic link to target.
  374. func (*S3Fs) Symlink(source, target string) error {
  375. return ErrVfsUnsupported
  376. }
  377. // Readlink returns the destination of the named symbolic link
  378. func (*S3Fs) Readlink(name string) (string, error) {
  379. return "", ErrVfsUnsupported
  380. }
  381. // Chown changes the numeric uid and gid of the named file.
  382. func (*S3Fs) Chown(name string, uid int, gid int) error {
  383. return ErrVfsUnsupported
  384. }
  385. // Chmod changes the mode of the named file to mode.
  386. func (*S3Fs) Chmod(name string, mode os.FileMode) error {
  387. return ErrVfsUnsupported
  388. }
  389. // Chtimes changes the access and modification times of the named file.
  390. func (fs *S3Fs) Chtimes(name string, atime, mtime time.Time, isUploading bool) error {
  391. if !plugin.Handler.HasMetadater() {
  392. return ErrVfsUnsupported
  393. }
  394. if !isUploading {
  395. info, err := fs.Stat(name)
  396. if err != nil {
  397. return err
  398. }
  399. if info.IsDir() {
  400. return ErrVfsUnsupported
  401. }
  402. }
  403. return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
  404. util.GetTimeAsMsSinceEpoch(mtime))
  405. }
  406. // Truncate changes the size of the named file.
  407. // Truncate by path is not supported, while truncating an opened
  408. // file is handled inside base transfer
  409. func (*S3Fs) Truncate(name string, size int64) error {
  410. return ErrVfsUnsupported
  411. }
  412. // ReadDir reads the directory named by dirname and returns
  413. // a list of directory entries.
  414. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  415. var result []os.FileInfo
  416. // dirname must be already cleaned
  417. prefix := ""
  418. if dirname != "/" && dirname != "." {
  419. prefix = strings.TrimPrefix(dirname, "/")
  420. if !strings.HasSuffix(prefix, "/") {
  421. prefix += "/"
  422. }
  423. }
  424. modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
  425. if err != nil {
  426. return result, err
  427. }
  428. prefixes := make(map[string]bool)
  429. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  430. defer cancelFn()
  431. err = fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  432. Bucket: aws.String(fs.config.Bucket),
  433. Prefix: aws.String(prefix),
  434. Delimiter: aws.String("/"),
  435. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  436. for _, p := range page.CommonPrefixes {
  437. // prefixes have a trailing slash
  438. name, _ := fs.resolve(p.Prefix, prefix)
  439. if name == "" {
  440. continue
  441. }
  442. if _, ok := prefixes[name]; ok {
  443. continue
  444. }
  445. result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
  446. prefixes[name] = true
  447. }
  448. for _, fileObject := range page.Contents {
  449. objectSize := *fileObject.Size
  450. objectModTime := *fileObject.LastModified
  451. name, isDir := fs.resolve(fileObject.Key, prefix)
  452. if name == "" {
  453. continue
  454. }
  455. if isDir {
  456. if _, ok := prefixes[name]; ok {
  457. continue
  458. }
  459. prefixes[name] = true
  460. }
  461. if t, ok := modTimes[name]; ok {
  462. objectModTime = util.GetTimeFromMsecSinceEpoch(t)
  463. }
  464. result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  465. }
  466. return true
  467. })
  468. metric.S3ListObjectsCompleted(err)
  469. return result, err
  470. }
  471. // IsUploadResumeSupported returns true if resuming uploads is supported.
  472. // Resuming uploads is not supported on S3
  473. func (*S3Fs) IsUploadResumeSupported() bool {
  474. return false
  475. }
  476. // IsAtomicUploadSupported returns true if atomic upload is supported.
  477. // S3 uploads are already atomic, we don't need to upload to a temporary
  478. // file
  479. func (*S3Fs) IsAtomicUploadSupported() bool {
  480. return false
  481. }
  482. // IsNotExist returns a boolean indicating whether the error is known to
  483. // report that a file or directory does not exist
  484. func (*S3Fs) IsNotExist(err error) bool {
  485. if err == nil {
  486. return false
  487. }
  488. if aerr, ok := err.(awserr.Error); ok {
  489. if aerr.Code() == s3.ErrCodeNoSuchKey {
  490. return true
  491. }
  492. if aerr.Code() == s3.ErrCodeNoSuchBucket {
  493. return true
  494. }
  495. }
  496. if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
  497. if multierr.Code() == s3.ErrCodeNoSuchKey {
  498. return true
  499. }
  500. if multierr.Code() == s3.ErrCodeNoSuchBucket {
  501. return true
  502. }
  503. }
  504. return strings.Contains(err.Error(), "404")
  505. }
  506. // IsPermission returns a boolean indicating whether the error is known to
  507. // report that permission is denied.
  508. func (*S3Fs) IsPermission(err error) bool {
  509. if err == nil {
  510. return false
  511. }
  512. return strings.Contains(err.Error(), "403")
  513. }
  514. // IsNotSupported returns true if the error indicate an unsupported operation
  515. func (*S3Fs) IsNotSupported(err error) bool {
  516. if err == nil {
  517. return false
  518. }
  519. return err == ErrVfsUnsupported
  520. }
  521. // CheckRootPath creates the specified local root directory if it does not exists
  522. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  523. // we need a local directory for temporary files
  524. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
  525. return osFs.CheckRootPath(username, uid, gid)
  526. }
  527. // ScanRootDirContents returns the number of files contained in the bucket,
  528. // and their size
  529. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  530. numFiles := 0
  531. size := int64(0)
  532. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  533. defer cancelFn()
  534. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  535. Bucket: aws.String(fs.config.Bucket),
  536. Prefix: aws.String(fs.config.KeyPrefix),
  537. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  538. for _, fileObject := range page.Contents {
  539. isDir := strings.HasSuffix(*fileObject.Key, "/")
  540. if isDir && *fileObject.Size == 0 {
  541. continue
  542. }
  543. numFiles++
  544. size += *fileObject.Size
  545. }
  546. return true
  547. })
  548. metric.S3ListObjectsCompleted(err)
  549. return numFiles, size, err
  550. }
  551. func (fs *S3Fs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
  552. fileNames := make(map[string]bool)
  553. prefix := ""
  554. if fsPrefix != "/" {
  555. prefix = strings.TrimPrefix(fsPrefix, "/")
  556. }
  557. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  558. defer cancelFn()
  559. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  560. Bucket: aws.String(fs.config.Bucket),
  561. Prefix: aws.String(prefix),
  562. Delimiter: aws.String("/"),
  563. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  564. for _, fileObject := range page.Contents {
  565. name, isDir := fs.resolve(fileObject.Key, prefix)
  566. if name != "" && !isDir {
  567. fileNames[name] = true
  568. }
  569. }
  570. return true
  571. })
  572. metric.S3ListObjectsCompleted(err)
  573. if err != nil {
  574. fsLog(fs, logger.LevelError, "unable to get content for prefix %#v: %v", prefix, err)
  575. return nil, err
  576. }
  577. return fileNames, err
  578. }
  579. // CheckMetadata checks the metadata consistency
  580. func (fs *S3Fs) CheckMetadata() error {
  581. return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
  582. }
  583. // GetDirSize returns the number of files and the size for a folder
  584. // including any subfolders
  585. func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
  586. return 0, 0, ErrVfsUnsupported
  587. }
  588. // GetAtomicUploadPath returns the path to use for an atomic upload.
  589. // S3 uploads are already atomic, we never call this method for S3
  590. func (*S3Fs) GetAtomicUploadPath(name string) string {
  591. return ""
  592. }
  593. // GetRelativePath returns the path for a file relative to the user's home dir.
  594. // This is the path as seen by SFTPGo users
  595. func (fs *S3Fs) GetRelativePath(name string) string {
  596. rel := path.Clean(name)
  597. if rel == "." {
  598. rel = ""
  599. }
  600. if !path.IsAbs(rel) {
  601. return "/" + rel
  602. }
  603. if fs.config.KeyPrefix != "" {
  604. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  605. rel = "/"
  606. }
  607. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  608. }
  609. if fs.mountPath != "" {
  610. rel = path.Join(fs.mountPath, rel)
  611. }
  612. return rel
  613. }
  614. // Walk walks the file tree rooted at root, calling walkFn for each file or
  615. // directory in the tree, including root. The result are unordered
  616. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  617. prefix := ""
  618. if root != "/" && root != "." {
  619. prefix = strings.TrimPrefix(root, "/")
  620. if !strings.HasSuffix(prefix, "/") {
  621. prefix += "/"
  622. }
  623. }
  624. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  625. defer cancelFn()
  626. err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
  627. Bucket: aws.String(fs.config.Bucket),
  628. Prefix: aws.String(prefix),
  629. }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  630. for _, fileObject := range page.Contents {
  631. objectSize := *fileObject.Size
  632. objectModTime := *fileObject.LastModified
  633. isDir := strings.HasSuffix(*fileObject.Key, "/")
  634. name := path.Clean(*fileObject.Key)
  635. if name == "/" || name == "." {
  636. continue
  637. }
  638. err := walkFn(fs.Join("/", *fileObject.Key), NewFileInfo(name, isDir, objectSize, objectModTime, false), nil)
  639. if err != nil {
  640. return false
  641. }
  642. }
  643. return true
  644. })
  645. metric.S3ListObjectsCompleted(err)
  646. walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck
  647. return err
  648. }
  649. // Join joins any number of path elements into a single path
  650. func (*S3Fs) Join(elem ...string) string {
  651. return path.Join(elem...)
  652. }
  653. // HasVirtualFolders returns true if folders are emulated
  654. func (*S3Fs) HasVirtualFolders() bool {
  655. return true
  656. }
  657. // ResolvePath returns the matching filesystem path for the specified virtual path
  658. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  659. if fs.mountPath != "" {
  660. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  661. }
  662. if !path.IsAbs(virtualPath) {
  663. virtualPath = path.Clean("/" + virtualPath)
  664. }
  665. return fs.Join("/", fs.config.KeyPrefix, virtualPath), nil
  666. }
  667. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  668. result := strings.TrimPrefix(*name, prefix)
  669. isDir := strings.HasSuffix(result, "/")
  670. if isDir {
  671. result = strings.TrimSuffix(result, "/")
  672. }
  673. if strings.Contains(result, "/") {
  674. i := strings.Index(result, "/")
  675. isDir = true
  676. result = result[:i]
  677. }
  678. return result, isDir
  679. }
  680. func (fs *S3Fs) checkIfBucketExists() error {
  681. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  682. defer cancelFn()
  683. _, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
  684. Bucket: aws.String(fs.config.Bucket),
  685. })
  686. metric.S3HeadBucketCompleted(err)
  687. return err
  688. }
  689. func (fs *S3Fs) hasContents(name string) (bool, error) {
  690. prefix := ""
  691. if name != "/" && name != "." {
  692. prefix = strings.TrimPrefix(name, "/")
  693. if !strings.HasSuffix(prefix, "/") {
  694. prefix += "/"
  695. }
  696. }
  697. maxResults := int64(2)
  698. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  699. defer cancelFn()
  700. results, err := fs.svc.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
  701. Bucket: aws.String(fs.config.Bucket),
  702. Prefix: aws.String(prefix),
  703. MaxKeys: &maxResults,
  704. })
  705. metric.S3ListObjectsCompleted(err)
  706. if err != nil {
  707. return false, err
  708. }
  709. // MinIO returns no contents while S3 returns 1 object
  710. // with the key equal to the prefix for empty directories
  711. for _, obj := range results.Contents {
  712. name, _ := fs.resolve(obj.Key, prefix)
  713. if name == "" || name == "/" {
  714. continue
  715. }
  716. return true, nil
  717. }
  718. return false, nil
  719. }
  720. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  721. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  722. defer cancelFn()
  723. obj, err := fs.svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
  724. Bucket: aws.String(fs.config.Bucket),
  725. Key: aws.String(name),
  726. })
  727. metric.S3HeadObjectCompleted(err)
  728. return obj, err
  729. }
  730. // GetMimeType returns the content type
  731. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  732. obj, err := fs.headObject(name)
  733. if err != nil {
  734. return "", err
  735. }
  736. return *obj.ContentType, err
  737. }
  738. // Close closes the fs
  739. func (*S3Fs) Close() error {
  740. return nil
  741. }
  742. // GetAvailableDiskSize return the available size for the specified path
  743. func (*S3Fs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  744. return nil, ErrStorageSizeUnavailable
  745. }
  746. func (fs *S3Fs) getStorageID() string {
  747. if fs.config.Endpoint != "" {
  748. if !strings.HasSuffix(fs.config.Endpoint, "/") {
  749. return fmt.Sprintf("s3://%v/%v", fs.config.Endpoint, fs.config.Bucket)
  750. }
  751. return fmt.Sprintf("s3://%v%v", fs.config.Endpoint, fs.config.Bucket)
  752. }
  753. return fmt.Sprintf("s3://%v", fs.config.Bucket)
  754. }
  755. // ideally we should simply use url.PathEscape:
  756. //
  757. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  758. //
  759. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  760. func pathEscape(in string) string {
  761. var u url.URL
  762. u.Path = in
  763. return strings.ReplaceAll(u.String(), "+", "%2B")
  764. }