s3fs.go 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "crypto/tls"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "net"
  25. "net/http"
  26. "net/url"
  27. "os"
  28. "path"
  29. "path/filepath"
  30. "sort"
  31. "strings"
  32. "sync"
  33. "sync/atomic"
  34. "time"
  35. "github.com/aws/aws-sdk-go-v2/aws"
  36. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  37. "github.com/aws/aws-sdk-go-v2/config"
  38. "github.com/aws/aws-sdk-go-v2/credentials"
  39. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  40. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  41. "github.com/aws/aws-sdk-go-v2/service/s3"
  42. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  43. "github.com/aws/aws-sdk-go-v2/service/sts"
  44. "github.com/eikenb/pipeat"
  45. "github.com/pkg/sftp"
  46. "github.com/drakkan/sftpgo/v2/internal/logger"
  47. "github.com/drakkan/sftpgo/v2/internal/metric"
  48. "github.com/drakkan/sftpgo/v2/internal/util"
  49. "github.com/drakkan/sftpgo/v2/internal/version"
  50. )
  51. const (
  52. // using this mime type for directories improves compatibility with s3fs-fuse
  53. s3DirMimeType = "application/x-directory"
  54. s3TransferBufferSize = 256 * 1024
  55. s3CopyObjectThreshold = 500 * 1024 * 1024
  56. )
  57. var (
  58. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  59. s3DefaultPageSize = int32(5000)
  60. )
  61. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  62. type S3Fs struct {
  63. connectionID string
  64. localTempDir string
  65. // if not empty this fs is mouted as virtual folder in the specified path
  66. mountPath string
  67. config *S3FsConfig
  68. svc *s3.Client
  69. ctxTimeout time.Duration
  70. }
  71. func init() {
  72. version.AddFeature("+s3")
  73. }
  74. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  75. // object storage
  76. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  77. if localTempDir == "" {
  78. localTempDir = getLocalTempDir()
  79. }
  80. fs := &S3Fs{
  81. connectionID: connectionID,
  82. localTempDir: localTempDir,
  83. mountPath: getMountPath(mountPath),
  84. config: &s3Config,
  85. ctxTimeout: 30 * time.Second,
  86. }
  87. if err := fs.config.validate(); err != nil {
  88. return fs, err
  89. }
  90. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  91. defer cancel()
  92. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(
  93. getAWSHTTPClient(0, 30*time.Second, fs.config.SkipTLSVerify)),
  94. )
  95. if err != nil {
  96. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  97. }
  98. if fs.config.Region != "" {
  99. awsConfig.Region = fs.config.Region
  100. }
  101. if !fs.config.AccessSecret.IsEmpty() {
  102. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  103. return fs, err
  104. }
  105. awsConfig.Credentials = aws.NewCredentialsCache(
  106. credentials.NewStaticCredentialsProvider(
  107. fs.config.AccessKey,
  108. fs.config.AccessSecret.GetPayload(),
  109. fs.config.SessionToken),
  110. )
  111. }
  112. fs.setConfigDefaults()
  113. if fs.config.RoleARN != "" {
  114. client := sts.NewFromConfig(awsConfig)
  115. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  116. awsConfig.Credentials = creds
  117. }
  118. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  119. o.AppID = version.GetVersionHash()
  120. o.UsePathStyle = fs.config.ForcePathStyle
  121. if fs.config.Endpoint != "" {
  122. o.BaseEndpoint = aws.String(fs.config.Endpoint)
  123. }
  124. })
  125. return fs, nil
  126. }
  127. // Name returns the name for the Fs implementation
  128. func (fs *S3Fs) Name() string {
  129. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  130. }
  131. // ConnectionID returns the connection ID associated to this Fs implementation
  132. func (fs *S3Fs) ConnectionID() string {
  133. return fs.connectionID
  134. }
  135. // Stat returns a FileInfo describing the named file
  136. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  137. var result *FileInfo
  138. if name == "" || name == "/" || name == "." {
  139. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  140. }
  141. if fs.config.KeyPrefix == name+"/" {
  142. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  143. }
  144. obj, err := fs.headObject(name)
  145. if err == nil {
  146. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  147. // So we check some common content types to detect if this is a "directory".
  148. isDir := util.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  149. if util.GetIntFromPointer(obj.ContentLength) == 0 && !isDir {
  150. _, err = fs.headObject(name + "/")
  151. isDir = err == nil
  152. }
  153. return NewFileInfo(name, isDir, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false), nil
  154. }
  155. if !fs.IsNotExist(err) {
  156. return result, err
  157. }
  158. // now check if this is a prefix (virtual directory)
  159. hasContents, err := fs.hasContents(name)
  160. if err == nil && hasContents {
  161. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  162. } else if err != nil {
  163. return nil, err
  164. }
  165. // the requested file may still be a directory as a zero bytes key
  166. // with a trailing forward slash (created using mkdir).
  167. // S3 doesn't return content type when listing objects, so we have
  168. // create "dirs" adding a trailing "/" to the key
  169. return fs.getStatForDir(name)
  170. }
  171. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  172. var result *FileInfo
  173. obj, err := fs.headObject(name + "/")
  174. if err != nil {
  175. return result, err
  176. }
  177. return NewFileInfo(name, true, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false), nil
  178. }
  179. // Lstat returns a FileInfo describing the named file
  180. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  181. return fs.Stat(name)
  182. }
  183. // Open opens the named file for reading
  184. func (fs *S3Fs) Open(name string, offset int64) (File, PipeReader, func(), error) {
  185. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  186. if err != nil {
  187. return nil, nil, nil, err
  188. }
  189. p := NewPipeReader(r)
  190. if readMetadata > 0 {
  191. attrs, err := fs.headObject(name)
  192. if err != nil {
  193. r.Close()
  194. w.Close()
  195. return nil, nil, nil, err
  196. }
  197. p.setMetadata(attrs.Metadata)
  198. }
  199. ctx, cancelFn := context.WithCancel(context.Background())
  200. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  201. d.Concurrency = fs.config.DownloadConcurrency
  202. d.PartSize = fs.config.DownloadPartSize
  203. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  204. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  205. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  206. fs.config.SkipTLSVerify)
  207. })
  208. }
  209. })
  210. var streamRange *string
  211. if offset > 0 {
  212. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  213. }
  214. go func() {
  215. defer cancelFn()
  216. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  217. Bucket: aws.String(fs.config.Bucket),
  218. Key: aws.String(name),
  219. Range: streamRange,
  220. })
  221. w.CloseWithError(err) //nolint:errcheck
  222. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  223. metric.S3TransferCompleted(n, 1, err)
  224. }()
  225. return nil, p, cancelFn, nil
  226. }
  227. // Create creates or opens the named file for writing
  228. func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
  229. if checks&CheckParentDir != 0 {
  230. _, err := fs.Stat(path.Dir(name))
  231. if err != nil {
  232. return nil, nil, nil, err
  233. }
  234. }
  235. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  236. if err != nil {
  237. return nil, nil, nil, err
  238. }
  239. var p PipeWriter
  240. if checks&CheckResume != 0 {
  241. p = newPipeWriterAtOffset(w, 0)
  242. } else {
  243. p = NewPipeWriter(w)
  244. }
  245. ctx, cancelFn := context.WithCancel(context.Background())
  246. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  247. u.Concurrency = fs.config.UploadConcurrency
  248. u.PartSize = fs.config.UploadPartSize
  249. if fs.config.UploadPartMaxTime > 0 {
  250. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  251. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond,
  252. fs.config.SkipTLSVerify)
  253. })
  254. }
  255. })
  256. go func() {
  257. defer cancelFn()
  258. var contentType string
  259. if flag == -1 {
  260. contentType = s3DirMimeType
  261. } else {
  262. contentType = mime.TypeByExtension(path.Ext(name))
  263. }
  264. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  265. Bucket: aws.String(fs.config.Bucket),
  266. Key: aws.String(name),
  267. Body: r,
  268. ACL: types.ObjectCannedACL(fs.config.ACL),
  269. StorageClass: types.StorageClass(fs.config.StorageClass),
  270. ContentType: util.NilIfEmpty(contentType),
  271. })
  272. r.CloseWithError(err) //nolint:errcheck
  273. p.Done(err)
  274. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v",
  275. name, fs.config.ACL, r.GetReadedBytes(), err)
  276. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  277. }()
  278. if checks&CheckResume != 0 {
  279. readCh := make(chan error, 1)
  280. go func() {
  281. n, err := fs.downloadToWriter(name, p)
  282. pw := p.(*pipeWriterAtOffset)
  283. pw.offset = 0
  284. pw.writeOffset = n
  285. readCh <- err
  286. }()
  287. err = <-readCh
  288. if err != nil {
  289. cancelFn()
  290. p.Close()
  291. fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
  292. return nil, nil, nil, err
  293. }
  294. }
  295. if uploadMode&4 != 0 {
  296. return nil, p, nil, nil
  297. }
  298. return nil, p, cancelFn, nil
  299. }
  300. // Rename renames (moves) source to target.
  301. func (fs *S3Fs) Rename(source, target string) (int, int64, error) {
  302. if source == target {
  303. return -1, -1, nil
  304. }
  305. _, err := fs.Stat(path.Dir(target))
  306. if err != nil {
  307. return -1, -1, err
  308. }
  309. fi, err := fs.Stat(source)
  310. if err != nil {
  311. return -1, -1, err
  312. }
  313. return fs.renameInternal(source, target, fi, 0)
  314. }
  315. // Remove removes the named file or (empty) directory.
  316. func (fs *S3Fs) Remove(name string, isDir bool) error {
  317. if isDir {
  318. hasContents, err := fs.hasContents(name)
  319. if err != nil {
  320. return err
  321. }
  322. if hasContents {
  323. return fmt.Errorf("cannot remove non empty directory: %q", name)
  324. }
  325. if !strings.HasSuffix(name, "/") {
  326. name += "/"
  327. }
  328. }
  329. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  330. defer cancelFn()
  331. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  332. Bucket: aws.String(fs.config.Bucket),
  333. Key: aws.String(name),
  334. })
  335. metric.S3DeleteObjectCompleted(err)
  336. return err
  337. }
  338. // Mkdir creates a new directory with the specified name and default permissions
  339. func (fs *S3Fs) Mkdir(name string) error {
  340. _, err := fs.Stat(name)
  341. if !fs.IsNotExist(err) {
  342. return err
  343. }
  344. return fs.mkdirInternal(name)
  345. }
  346. // Symlink creates source as a symbolic link to target.
  347. func (*S3Fs) Symlink(_, _ string) error {
  348. return ErrVfsUnsupported
  349. }
  350. // Readlink returns the destination of the named symbolic link
  351. func (*S3Fs) Readlink(_ string) (string, error) {
  352. return "", ErrVfsUnsupported
  353. }
  354. // Chown changes the numeric uid and gid of the named file.
  355. func (*S3Fs) Chown(_ string, _ int, _ int) error {
  356. return ErrVfsUnsupported
  357. }
  358. // Chmod changes the mode of the named file to mode.
  359. func (*S3Fs) Chmod(_ string, _ os.FileMode) error {
  360. return ErrVfsUnsupported
  361. }
  362. // Chtimes changes the access and modification times of the named file.
  363. func (fs *S3Fs) Chtimes(_ string, _, _ time.Time, _ bool) error {
  364. return ErrVfsUnsupported
  365. }
  366. // Truncate changes the size of the named file.
  367. // Truncate by path is not supported, while truncating an opened
  368. // file is handled inside base transfer
  369. func (*S3Fs) Truncate(_ string, _ int64) error {
  370. return ErrVfsUnsupported
  371. }
  372. // ReadDir reads the directory named by dirname and returns
  373. // a list of directory entries.
  374. func (fs *S3Fs) ReadDir(dirname string) (DirLister, error) {
  375. // dirname must be already cleaned
  376. prefix := fs.getPrefix(dirname)
  377. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  378. Bucket: aws.String(fs.config.Bucket),
  379. Prefix: aws.String(prefix),
  380. Delimiter: aws.String("/"),
  381. MaxKeys: &s3DefaultPageSize,
  382. })
  383. return &s3DirLister{
  384. paginator: paginator,
  385. timeout: fs.ctxTimeout,
  386. prefix: prefix,
  387. prefixes: make(map[string]bool),
  388. }, nil
  389. }
  390. // IsUploadResumeSupported returns true if resuming uploads is supported.
  391. // Resuming uploads is not supported on S3
  392. func (*S3Fs) IsUploadResumeSupported() bool {
  393. return false
  394. }
  395. // IsConditionalUploadResumeSupported returns if resuming uploads is supported
  396. // for the specified size
  397. func (*S3Fs) IsConditionalUploadResumeSupported(size int64) bool {
  398. return size <= resumeMaxSize
  399. }
  400. // IsAtomicUploadSupported returns true if atomic upload is supported.
  401. // S3 uploads are already atomic, we don't need to upload to a temporary
  402. // file
  403. func (*S3Fs) IsAtomicUploadSupported() bool {
  404. return false
  405. }
  406. // IsNotExist returns a boolean indicating whether the error is known to
  407. // report that a file or directory does not exist
  408. func (*S3Fs) IsNotExist(err error) bool {
  409. if err == nil {
  410. return false
  411. }
  412. var re *awshttp.ResponseError
  413. if errors.As(err, &re) {
  414. if re.Response != nil {
  415. return re.Response.StatusCode == http.StatusNotFound
  416. }
  417. }
  418. return false
  419. }
  420. // IsPermission returns a boolean indicating whether the error is known to
  421. // report that permission is denied.
  422. func (*S3Fs) IsPermission(err error) bool {
  423. if err == nil {
  424. return false
  425. }
  426. var re *awshttp.ResponseError
  427. if errors.As(err, &re) {
  428. if re.Response != nil {
  429. return re.Response.StatusCode == http.StatusForbidden ||
  430. re.Response.StatusCode == http.StatusUnauthorized
  431. }
  432. }
  433. return false
  434. }
  435. // IsNotSupported returns true if the error indicate an unsupported operation
  436. func (*S3Fs) IsNotSupported(err error) bool {
  437. if err == nil {
  438. return false
  439. }
  440. return errors.Is(err, ErrVfsUnsupported)
  441. }
  442. // CheckRootPath creates the specified local root directory if it does not exists
  443. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  444. // we need a local directory for temporary files
  445. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  446. return osFs.CheckRootPath(username, uid, gid)
  447. }
  448. // ScanRootDirContents returns the number of files contained in the bucket,
  449. // and their size
  450. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  451. return fs.GetDirSize(fs.config.KeyPrefix)
  452. }
  453. // GetDirSize returns the number of files and the size for a folder
  454. // including any subfolders
  455. func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
  456. prefix := fs.getPrefix(dirname)
  457. numFiles := 0
  458. size := int64(0)
  459. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  460. Bucket: aws.String(fs.config.Bucket),
  461. Prefix: aws.String(prefix),
  462. MaxKeys: &s3DefaultPageSize,
  463. })
  464. for paginator.HasMorePages() {
  465. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  466. defer cancelFn()
  467. page, err := paginator.NextPage(ctx)
  468. if err != nil {
  469. metric.S3ListObjectsCompleted(err)
  470. return numFiles, size, err
  471. }
  472. for _, fileObject := range page.Contents {
  473. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  474. objectSize := util.GetIntFromPointer(fileObject.Size)
  475. if isDir && objectSize == 0 {
  476. continue
  477. }
  478. numFiles++
  479. size += objectSize
  480. }
  481. fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size)
  482. }
  483. metric.S3ListObjectsCompleted(nil)
  484. return numFiles, size, nil
  485. }
  486. // GetAtomicUploadPath returns the path to use for an atomic upload.
  487. // S3 uploads are already atomic, we never call this method for S3
  488. func (*S3Fs) GetAtomicUploadPath(_ string) string {
  489. return ""
  490. }
  491. // GetRelativePath returns the path for a file relative to the user's home dir.
  492. // This is the path as seen by SFTPGo users
  493. func (fs *S3Fs) GetRelativePath(name string) string {
  494. rel := path.Clean(name)
  495. if rel == "." {
  496. rel = ""
  497. }
  498. if !path.IsAbs(rel) {
  499. rel = "/" + rel
  500. }
  501. if fs.config.KeyPrefix != "" {
  502. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  503. rel = "/"
  504. }
  505. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  506. }
  507. if fs.mountPath != "" {
  508. rel = path.Join(fs.mountPath, rel)
  509. }
  510. return rel
  511. }
  512. // Walk walks the file tree rooted at root, calling walkFn for each file or
  513. // directory in the tree, including root. The result are unordered
  514. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  515. prefix := fs.getPrefix(root)
  516. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  517. Bucket: aws.String(fs.config.Bucket),
  518. Prefix: aws.String(prefix),
  519. MaxKeys: &s3DefaultPageSize,
  520. })
  521. for paginator.HasMorePages() {
  522. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  523. defer cancelFn()
  524. page, err := paginator.NextPage(ctx)
  525. if err != nil {
  526. metric.S3ListObjectsCompleted(err)
  527. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  528. return err
  529. }
  530. for _, fileObject := range page.Contents {
  531. name, isDir := fs.resolve(fileObject.Key, prefix)
  532. if name == "" {
  533. continue
  534. }
  535. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  536. NewFileInfo(name, isDir, util.GetIntFromPointer(fileObject.Size),
  537. util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  538. if err != nil {
  539. return err
  540. }
  541. }
  542. }
  543. metric.S3ListObjectsCompleted(nil)
  544. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  545. return nil
  546. }
  547. // Join joins any number of path elements into a single path
  548. func (*S3Fs) Join(elem ...string) string {
  549. return strings.TrimPrefix(path.Join(elem...), "/")
  550. }
  551. // HasVirtualFolders returns true if folders are emulated
  552. func (*S3Fs) HasVirtualFolders() bool {
  553. return true
  554. }
  555. // ResolvePath returns the matching filesystem path for the specified virtual path
  556. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  557. if fs.mountPath != "" {
  558. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  559. }
  560. if !path.IsAbs(virtualPath) {
  561. virtualPath = path.Clean("/" + virtualPath)
  562. }
  563. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  564. }
  565. // CopyFile implements the FsFileCopier interface
  566. func (fs *S3Fs) CopyFile(source, target string, srcSize int64) (int, int64, error) {
  567. numFiles := 1
  568. sizeDiff := srcSize
  569. attrs, err := fs.headObject(target)
  570. if err == nil {
  571. sizeDiff -= util.GetIntFromPointer(attrs.ContentLength)
  572. numFiles = 0
  573. } else {
  574. if !fs.IsNotExist(err) {
  575. return 0, 0, err
  576. }
  577. }
  578. if err := fs.copyFileInternal(source, target, srcSize); err != nil {
  579. return 0, 0, err
  580. }
  581. return numFiles, sizeDiff, nil
  582. }
  583. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  584. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  585. isDir := strings.HasSuffix(result, "/")
  586. if isDir {
  587. result = strings.TrimSuffix(result, "/")
  588. }
  589. return result, isDir
  590. }
  591. func (fs *S3Fs) setConfigDefaults() {
  592. if fs.config.UploadPartSize == 0 {
  593. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  594. } else {
  595. if fs.config.UploadPartSize < 1024*1024 {
  596. fs.config.UploadPartSize *= 1024 * 1024
  597. }
  598. }
  599. if fs.config.UploadConcurrency == 0 {
  600. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  601. }
  602. if fs.config.DownloadPartSize == 0 {
  603. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  604. } else {
  605. if fs.config.DownloadPartSize < 1024*1024 {
  606. fs.config.DownloadPartSize *= 1024 * 1024
  607. }
  608. }
  609. if fs.config.DownloadConcurrency == 0 {
  610. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  611. }
  612. }
  613. func (fs *S3Fs) copyFileInternal(source, target string, fileSize int64) error {
  614. contentType := mime.TypeByExtension(path.Ext(source))
  615. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  616. if fileSize > s3CopyObjectThreshold {
  617. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  618. source, fileSize)
  619. err := fs.doMultipartCopy(copySource, target, contentType, fileSize)
  620. metric.S3CopyObjectCompleted(err)
  621. return err
  622. }
  623. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  624. defer cancelFn()
  625. _, err := fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  626. Bucket: aws.String(fs.config.Bucket),
  627. CopySource: aws.String(copySource),
  628. Key: aws.String(target),
  629. StorageClass: types.StorageClass(fs.config.StorageClass),
  630. ACL: types.ObjectCannedACL(fs.config.ACL),
  631. ContentType: util.NilIfEmpty(contentType),
  632. })
  633. metric.S3CopyObjectCompleted(err)
  634. return err
  635. }
  636. func (fs *S3Fs) renameInternal(source, target string, fi os.FileInfo, recursion int) (int, int64, error) {
  637. var numFiles int
  638. var filesSize int64
  639. if fi.IsDir() {
  640. if renameMode == 0 {
  641. hasContents, err := fs.hasContents(source)
  642. if err != nil {
  643. return numFiles, filesSize, err
  644. }
  645. if hasContents {
  646. return numFiles, filesSize, fmt.Errorf("%w: cannot rename non empty directory: %q", ErrVfsUnsupported, source)
  647. }
  648. }
  649. if err := fs.mkdirInternal(target); err != nil {
  650. return numFiles, filesSize, err
  651. }
  652. if renameMode == 1 {
  653. files, size, err := doRecursiveRename(fs, source, target, fs.renameInternal, recursion)
  654. numFiles += files
  655. filesSize += size
  656. if err != nil {
  657. return numFiles, filesSize, err
  658. }
  659. }
  660. } else {
  661. if err := fs.copyFileInternal(source, target, fi.Size()); err != nil {
  662. return numFiles, filesSize, err
  663. }
  664. numFiles++
  665. filesSize += fi.Size()
  666. }
  667. err := fs.Remove(source, fi.IsDir())
  668. if fs.IsNotExist(err) {
  669. err = nil
  670. }
  671. return numFiles, filesSize, err
  672. }
  673. func (fs *S3Fs) mkdirInternal(name string) error {
  674. if !strings.HasSuffix(name, "/") {
  675. name += "/"
  676. }
  677. _, w, _, err := fs.Create(name, -1, 0)
  678. if err != nil {
  679. return err
  680. }
  681. return w.Close()
  682. }
  683. func (fs *S3Fs) hasContents(name string) (bool, error) {
  684. prefix := fs.getPrefix(name)
  685. maxKeys := int32(2)
  686. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  687. Bucket: aws.String(fs.config.Bucket),
  688. Prefix: aws.String(prefix),
  689. MaxKeys: &maxKeys,
  690. })
  691. if paginator.HasMorePages() {
  692. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  693. defer cancelFn()
  694. page, err := paginator.NextPage(ctx)
  695. metric.S3ListObjectsCompleted(err)
  696. if err != nil {
  697. return false, err
  698. }
  699. for _, obj := range page.Contents {
  700. name, _ := fs.resolve(obj.Key, prefix)
  701. if name == "" || name == "/" {
  702. continue
  703. }
  704. return true, nil
  705. }
  706. return false, nil
  707. }
  708. metric.S3ListObjectsCompleted(nil)
  709. return false, nil
  710. }
  711. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  712. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  713. defer cancelFn()
  714. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  715. Bucket: aws.String(fs.config.Bucket),
  716. Key: aws.String(target),
  717. StorageClass: types.StorageClass(fs.config.StorageClass),
  718. ACL: types.ObjectCannedACL(fs.config.ACL),
  719. ContentType: util.NilIfEmpty(contentType),
  720. })
  721. if err != nil {
  722. return fmt.Errorf("unable to create multipart copy request: %w", err)
  723. }
  724. uploadID := util.GetStringFromPointer(res.UploadId)
  725. if uploadID == "" {
  726. return errors.New("unable to get multipart copy upload ID")
  727. }
  728. // We use 32 MB part size and copy 10 parts in parallel.
  729. // These values are arbitrary. We don't want to start too many goroutines
  730. maxPartSize := int64(32 * 1024 * 1024)
  731. if fileSize > int64(100*1024*1024*1024) {
  732. maxPartSize = int64(500 * 1024 * 1024)
  733. }
  734. guard := make(chan struct{}, 10)
  735. finished := false
  736. var completedParts []types.CompletedPart
  737. var partMutex sync.Mutex
  738. var wg sync.WaitGroup
  739. var hasError atomic.Bool
  740. var errOnce sync.Once
  741. var copyError error
  742. var partNumber int32
  743. var offset int64
  744. opCtx, opCancel := context.WithCancel(context.Background())
  745. defer opCancel()
  746. for partNumber = 1; !finished; partNumber++ {
  747. start := offset
  748. end := offset + maxPartSize
  749. if end >= fileSize {
  750. end = fileSize
  751. finished = true
  752. }
  753. offset = end
  754. guard <- struct{}{}
  755. if hasError.Load() {
  756. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  757. break
  758. }
  759. wg.Add(1)
  760. go func(partNum int32, partStart, partEnd int64) {
  761. defer func() {
  762. <-guard
  763. wg.Done()
  764. }()
  765. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  766. defer innerCancelFn()
  767. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  768. Bucket: aws.String(fs.config.Bucket),
  769. CopySource: aws.String(source),
  770. Key: aws.String(target),
  771. PartNumber: &partNum,
  772. UploadId: aws.String(uploadID),
  773. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  774. })
  775. if err != nil {
  776. errOnce.Do(func() {
  777. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  778. hasError.Store(true)
  779. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  780. opCancel()
  781. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  782. defer abortCancelFn()
  783. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  784. Bucket: aws.String(fs.config.Bucket),
  785. Key: aws.String(target),
  786. UploadId: aws.String(uploadID),
  787. })
  788. if errAbort != nil {
  789. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  790. }
  791. })
  792. return
  793. }
  794. partMutex.Lock()
  795. completedParts = append(completedParts, types.CompletedPart{
  796. ETag: partResp.CopyPartResult.ETag,
  797. PartNumber: &partNum,
  798. })
  799. partMutex.Unlock()
  800. }(partNumber, start, end)
  801. }
  802. wg.Wait()
  803. close(guard)
  804. if copyError != nil {
  805. return copyError
  806. }
  807. sort.Slice(completedParts, func(i, j int) bool {
  808. getPartNumber := func(number *int32) int32 {
  809. if number == nil {
  810. return 0
  811. }
  812. return *number
  813. }
  814. return getPartNumber(completedParts[i].PartNumber) < getPartNumber(completedParts[j].PartNumber)
  815. })
  816. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  817. defer completeCancelFn()
  818. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  819. Bucket: aws.String(fs.config.Bucket),
  820. Key: aws.String(target),
  821. UploadId: aws.String(uploadID),
  822. MultipartUpload: &types.CompletedMultipartUpload{
  823. Parts: completedParts,
  824. },
  825. })
  826. if err != nil {
  827. return fmt.Errorf("unable to complete multipart upload: %w", err)
  828. }
  829. return nil
  830. }
  831. func (fs *S3Fs) getPrefix(name string) string {
  832. prefix := ""
  833. if name != "" && name != "." && name != "/" {
  834. prefix = strings.TrimPrefix(name, "/")
  835. if !strings.HasSuffix(prefix, "/") {
  836. prefix += "/"
  837. }
  838. }
  839. return prefix
  840. }
  841. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  842. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  843. defer cancelFn()
  844. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  845. Bucket: aws.String(fs.config.Bucket),
  846. Key: aws.String(name),
  847. })
  848. metric.S3HeadObjectCompleted(err)
  849. return obj, err
  850. }
  851. // GetMimeType returns the content type
  852. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  853. obj, err := fs.headObject(name)
  854. if err != nil {
  855. return "", err
  856. }
  857. return util.GetStringFromPointer(obj.ContentType), nil
  858. }
  859. // Close closes the fs
  860. func (*S3Fs) Close() error {
  861. return nil
  862. }
  863. // GetAvailableDiskSize returns the available size for the specified path
  864. func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  865. return nil, ErrStorageSizeUnavailable
  866. }
  867. func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) (int64, error) {
  868. fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
  869. ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
  870. defer cancelFn()
  871. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  872. d.Concurrency = fs.config.DownloadConcurrency
  873. d.PartSize = fs.config.DownloadPartSize
  874. if fs.config.DownloadPartMaxTime > 0 {
  875. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  876. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  877. fs.config.SkipTLSVerify)
  878. })
  879. }
  880. })
  881. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  882. Bucket: aws.String(fs.config.Bucket),
  883. Key: aws.String(name),
  884. })
  885. fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
  886. name, n, err)
  887. metric.S3TransferCompleted(n, 1, err)
  888. return n, err
  889. }
  890. type s3DirLister struct {
  891. baseDirLister
  892. paginator *s3.ListObjectsV2Paginator
  893. timeout time.Duration
  894. prefix string
  895. prefixes map[string]bool
  896. metricUpdated bool
  897. }
  898. func (l *s3DirLister) resolve(name *string) (string, bool) {
  899. result := strings.TrimPrefix(util.GetStringFromPointer(name), l.prefix)
  900. isDir := strings.HasSuffix(result, "/")
  901. if isDir {
  902. result = strings.TrimSuffix(result, "/")
  903. }
  904. return result, isDir
  905. }
  906. func (l *s3DirLister) Next(limit int) ([]os.FileInfo, error) {
  907. if limit <= 0 {
  908. return nil, errInvalidDirListerLimit
  909. }
  910. if len(l.cache) >= limit {
  911. return l.returnFromCache(limit), nil
  912. }
  913. if !l.paginator.HasMorePages() {
  914. if !l.metricUpdated {
  915. l.metricUpdated = true
  916. metric.S3ListObjectsCompleted(nil)
  917. }
  918. return l.returnFromCache(limit), io.EOF
  919. }
  920. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(l.timeout))
  921. defer cancelFn()
  922. page, err := l.paginator.NextPage(ctx)
  923. if err != nil {
  924. metric.S3ListObjectsCompleted(err)
  925. return l.cache, err
  926. }
  927. for _, p := range page.CommonPrefixes {
  928. // prefixes have a trailing slash
  929. name, _ := l.resolve(p.Prefix)
  930. if name == "" {
  931. continue
  932. }
  933. if _, ok := l.prefixes[name]; ok {
  934. continue
  935. }
  936. l.cache = append(l.cache, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  937. l.prefixes[name] = true
  938. }
  939. for _, fileObject := range page.Contents {
  940. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  941. objectSize := util.GetIntFromPointer(fileObject.Size)
  942. name, isDir := l.resolve(fileObject.Key)
  943. if name == "" || name == "/" {
  944. continue
  945. }
  946. if isDir {
  947. if _, ok := l.prefixes[name]; ok {
  948. continue
  949. }
  950. l.prefixes[name] = true
  951. }
  952. l.cache = append(l.cache, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  953. }
  954. return l.returnFromCache(limit), nil
  955. }
  956. func (l *s3DirLister) Close() error {
  957. return l.baseDirLister.Close()
  958. }
  959. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration, skipTLSVerify bool) *awshttp.BuildableClient {
  960. c := awshttp.NewBuildableClient().
  961. WithDialerOptions(func(d *net.Dialer) {
  962. d.Timeout = 8 * time.Second
  963. }).
  964. WithTransportOptions(func(tr *http.Transport) {
  965. tr.IdleConnTimeout = idleConnectionTimeout
  966. tr.WriteBufferSize = s3TransferBufferSize
  967. tr.ReadBufferSize = s3TransferBufferSize
  968. if skipTLSVerify {
  969. if tr.TLSClientConfig != nil {
  970. tr.TLSClientConfig.InsecureSkipVerify = skipTLSVerify
  971. } else {
  972. tr.TLSClientConfig = &tls.Config{
  973. MinVersion: awshttp.DefaultHTTPTransportTLSMinVersion,
  974. InsecureSkipVerify: skipTLSVerify,
  975. }
  976. }
  977. }
  978. })
  979. if timeout > 0 {
  980. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  981. }
  982. return c
  983. }
  984. // ideally we should simply use url.PathEscape:
  985. //
  986. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  987. //
  988. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  989. func pathEscape(in string) string {
  990. var u url.URL
  991. u.Path = in
  992. return strings.ReplaceAll(u.String(), "+", "%2B")
  993. }