s3fs.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "crypto/tls"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "net"
  25. "net/http"
  26. "net/url"
  27. "os"
  28. "path"
  29. "path/filepath"
  30. "slices"
  31. "sort"
  32. "strings"
  33. "sync"
  34. "sync/atomic"
  35. "time"
  36. "github.com/aws/aws-sdk-go-v2/aws"
  37. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  38. "github.com/aws/aws-sdk-go-v2/config"
  39. "github.com/aws/aws-sdk-go-v2/credentials"
  40. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  41. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  42. "github.com/aws/aws-sdk-go-v2/service/s3"
  43. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  44. "github.com/aws/aws-sdk-go-v2/service/sts"
  45. "github.com/eikenb/pipeat"
  46. "github.com/pkg/sftp"
  47. "github.com/drakkan/sftpgo/v2/internal/logger"
  48. "github.com/drakkan/sftpgo/v2/internal/metric"
  49. "github.com/drakkan/sftpgo/v2/internal/util"
  50. "github.com/drakkan/sftpgo/v2/internal/version"
  51. )
  52. const (
  53. // using this mime type for directories improves compatibility with s3fs-fuse
  54. s3DirMimeType = "application/x-directory"
  55. s3TransferBufferSize = 256 * 1024
  56. s3CopyObjectThreshold = 500 * 1024 * 1024
  57. )
  58. var (
  59. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  60. s3DefaultPageSize = int32(5000)
  61. )
  62. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  63. type S3Fs struct {
  64. connectionID string
  65. localTempDir string
  66. // if not empty this fs is mouted as virtual folder in the specified path
  67. mountPath string
  68. config *S3FsConfig
  69. svc *s3.Client
  70. ctxTimeout time.Duration
  71. }
  72. func init() {
  73. version.AddFeature("+s3")
  74. }
  75. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  76. // object storage
  77. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  78. if localTempDir == "" {
  79. localTempDir = getLocalTempDir()
  80. }
  81. fs := &S3Fs{
  82. connectionID: connectionID,
  83. localTempDir: localTempDir,
  84. mountPath: getMountPath(mountPath),
  85. config: &s3Config,
  86. ctxTimeout: 30 * time.Second,
  87. }
  88. if err := fs.config.validate(); err != nil {
  89. return fs, err
  90. }
  91. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  92. defer cancel()
  93. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(
  94. getAWSHTTPClient(0, 30*time.Second, fs.config.SkipTLSVerify)),
  95. )
  96. if err != nil {
  97. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  98. }
  99. if fs.config.Region != "" {
  100. awsConfig.Region = fs.config.Region
  101. }
  102. if !fs.config.AccessSecret.IsEmpty() {
  103. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  104. return fs, err
  105. }
  106. awsConfig.Credentials = aws.NewCredentialsCache(
  107. credentials.NewStaticCredentialsProvider(
  108. fs.config.AccessKey,
  109. fs.config.AccessSecret.GetPayload(),
  110. fs.config.SessionToken),
  111. )
  112. }
  113. fs.setConfigDefaults()
  114. if fs.config.RoleARN != "" {
  115. client := sts.NewFromConfig(awsConfig)
  116. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  117. awsConfig.Credentials = creds
  118. }
  119. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  120. o.AppID = version.GetVersionHash()
  121. o.UsePathStyle = fs.config.ForcePathStyle
  122. if fs.config.Endpoint != "" {
  123. o.BaseEndpoint = aws.String(fs.config.Endpoint)
  124. }
  125. })
  126. return fs, nil
  127. }
  128. // Name returns the name for the Fs implementation
  129. func (fs *S3Fs) Name() string {
  130. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  131. }
  132. // ConnectionID returns the connection ID associated to this Fs implementation
  133. func (fs *S3Fs) ConnectionID() string {
  134. return fs.connectionID
  135. }
  136. // Stat returns a FileInfo describing the named file
  137. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  138. var result *FileInfo
  139. if name == "" || name == "/" || name == "." {
  140. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  141. }
  142. if fs.config.KeyPrefix == name+"/" {
  143. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  144. }
  145. obj, err := fs.headObject(name)
  146. if err == nil {
  147. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  148. // So we check some common content types to detect if this is a "directory".
  149. isDir := slices.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  150. if util.GetIntFromPointer(obj.ContentLength) == 0 && !isDir {
  151. _, err = fs.headObject(name + "/")
  152. isDir = err == nil
  153. }
  154. return NewFileInfo(name, isDir, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false), nil
  155. }
  156. if !fs.IsNotExist(err) {
  157. return result, err
  158. }
  159. // now check if this is a prefix (virtual directory)
  160. hasContents, err := fs.hasContents(name)
  161. if err == nil && hasContents {
  162. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  163. } else if err != nil {
  164. return nil, err
  165. }
  166. // the requested file may still be a directory as a zero bytes key
  167. // with a trailing forward slash (created using mkdir).
  168. // S3 doesn't return content type when listing objects, so we have
  169. // create "dirs" adding a trailing "/" to the key
  170. return fs.getStatForDir(name)
  171. }
  172. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  173. var result *FileInfo
  174. obj, err := fs.headObject(name + "/")
  175. if err != nil {
  176. return result, err
  177. }
  178. return NewFileInfo(name, true, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false), nil
  179. }
  180. // Lstat returns a FileInfo describing the named file
  181. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  182. return fs.Stat(name)
  183. }
  184. // Open opens the named file for reading
  185. func (fs *S3Fs) Open(name string, offset int64) (File, PipeReader, func(), error) {
  186. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  187. if err != nil {
  188. return nil, nil, nil, err
  189. }
  190. p := NewPipeReader(r)
  191. if readMetadata > 0 {
  192. attrs, err := fs.headObject(name)
  193. if err != nil {
  194. r.Close()
  195. w.Close()
  196. return nil, nil, nil, err
  197. }
  198. p.setMetadata(attrs.Metadata)
  199. }
  200. ctx, cancelFn := context.WithCancel(context.Background())
  201. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  202. d.Concurrency = fs.config.DownloadConcurrency
  203. d.PartSize = fs.config.DownloadPartSize
  204. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  205. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  206. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  207. fs.config.SkipTLSVerify)
  208. })
  209. }
  210. })
  211. var streamRange *string
  212. if offset > 0 {
  213. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  214. }
  215. go func() {
  216. defer cancelFn()
  217. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  218. Bucket: aws.String(fs.config.Bucket),
  219. Key: aws.String(name),
  220. Range: streamRange,
  221. })
  222. w.CloseWithError(err) //nolint:errcheck
  223. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  224. metric.S3TransferCompleted(n, 1, err)
  225. }()
  226. return nil, p, cancelFn, nil
  227. }
  228. // Create creates or opens the named file for writing
  229. func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
  230. if checks&CheckParentDir != 0 {
  231. _, err := fs.Stat(path.Dir(name))
  232. if err != nil {
  233. return nil, nil, nil, err
  234. }
  235. }
  236. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  237. if err != nil {
  238. return nil, nil, nil, err
  239. }
  240. var p PipeWriter
  241. if checks&CheckResume != 0 {
  242. p = newPipeWriterAtOffset(w, 0)
  243. } else {
  244. p = NewPipeWriter(w)
  245. }
  246. ctx, cancelFn := context.WithCancel(context.Background())
  247. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  248. u.Concurrency = fs.config.UploadConcurrency
  249. u.PartSize = fs.config.UploadPartSize
  250. if fs.config.UploadPartMaxTime > 0 {
  251. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  252. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond,
  253. fs.config.SkipTLSVerify)
  254. })
  255. }
  256. })
  257. go func() {
  258. defer cancelFn()
  259. var contentType string
  260. if flag == -1 {
  261. contentType = s3DirMimeType
  262. } else {
  263. contentType = mime.TypeByExtension(path.Ext(name))
  264. }
  265. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  266. Bucket: aws.String(fs.config.Bucket),
  267. Key: aws.String(name),
  268. Body: r,
  269. ACL: types.ObjectCannedACL(fs.config.ACL),
  270. StorageClass: types.StorageClass(fs.config.StorageClass),
  271. ContentType: util.NilIfEmpty(contentType),
  272. })
  273. r.CloseWithError(err) //nolint:errcheck
  274. p.Done(err)
  275. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v",
  276. name, fs.config.ACL, r.GetReadedBytes(), err)
  277. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  278. }()
  279. if checks&CheckResume != 0 {
  280. readCh := make(chan error, 1)
  281. go func() {
  282. n, err := fs.downloadToWriter(name, p)
  283. pw := p.(*pipeWriterAtOffset)
  284. pw.offset = 0
  285. pw.writeOffset = n
  286. readCh <- err
  287. }()
  288. err = <-readCh
  289. if err != nil {
  290. cancelFn()
  291. p.Close()
  292. fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
  293. return nil, nil, nil, err
  294. }
  295. }
  296. if uploadMode&4 != 0 {
  297. return nil, p, nil, nil
  298. }
  299. return nil, p, cancelFn, nil
  300. }
  301. // Rename renames (moves) source to target.
  302. func (fs *S3Fs) Rename(source, target string) (int, int64, error) {
  303. if source == target {
  304. return -1, -1, nil
  305. }
  306. _, err := fs.Stat(path.Dir(target))
  307. if err != nil {
  308. return -1, -1, err
  309. }
  310. fi, err := fs.Stat(source)
  311. if err != nil {
  312. return -1, -1, err
  313. }
  314. return fs.renameInternal(source, target, fi, 0)
  315. }
  316. // Remove removes the named file or (empty) directory.
  317. func (fs *S3Fs) Remove(name string, isDir bool) error {
  318. if isDir {
  319. hasContents, err := fs.hasContents(name)
  320. if err != nil {
  321. return err
  322. }
  323. if hasContents {
  324. return fmt.Errorf("cannot remove non empty directory: %q", name)
  325. }
  326. if !strings.HasSuffix(name, "/") {
  327. name += "/"
  328. }
  329. }
  330. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  331. defer cancelFn()
  332. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  333. Bucket: aws.String(fs.config.Bucket),
  334. Key: aws.String(name),
  335. })
  336. metric.S3DeleteObjectCompleted(err)
  337. return err
  338. }
  339. // Mkdir creates a new directory with the specified name and default permissions
  340. func (fs *S3Fs) Mkdir(name string) error {
  341. _, err := fs.Stat(name)
  342. if !fs.IsNotExist(err) {
  343. return err
  344. }
  345. return fs.mkdirInternal(name)
  346. }
  347. // Symlink creates source as a symbolic link to target.
  348. func (*S3Fs) Symlink(_, _ string) error {
  349. return ErrVfsUnsupported
  350. }
  351. // Readlink returns the destination of the named symbolic link
  352. func (*S3Fs) Readlink(_ string) (string, error) {
  353. return "", ErrVfsUnsupported
  354. }
  355. // Chown changes the numeric uid and gid of the named file.
  356. func (*S3Fs) Chown(_ string, _ int, _ int) error {
  357. return ErrVfsUnsupported
  358. }
  359. // Chmod changes the mode of the named file to mode.
  360. func (*S3Fs) Chmod(_ string, _ os.FileMode) error {
  361. return ErrVfsUnsupported
  362. }
  363. // Chtimes changes the access and modification times of the named file.
  364. func (fs *S3Fs) Chtimes(_ string, _, _ time.Time, _ bool) error {
  365. return ErrVfsUnsupported
  366. }
  367. // Truncate changes the size of the named file.
  368. // Truncate by path is not supported, while truncating an opened
  369. // file is handled inside base transfer
  370. func (*S3Fs) Truncate(_ string, _ int64) error {
  371. return ErrVfsUnsupported
  372. }
  373. // ReadDir reads the directory named by dirname and returns
  374. // a list of directory entries.
  375. func (fs *S3Fs) ReadDir(dirname string) (DirLister, error) {
  376. // dirname must be already cleaned
  377. prefix := fs.getPrefix(dirname)
  378. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  379. Bucket: aws.String(fs.config.Bucket),
  380. Prefix: aws.String(prefix),
  381. Delimiter: aws.String("/"),
  382. MaxKeys: &s3DefaultPageSize,
  383. })
  384. return &s3DirLister{
  385. paginator: paginator,
  386. timeout: fs.ctxTimeout,
  387. prefix: prefix,
  388. prefixes: make(map[string]bool),
  389. }, nil
  390. }
  391. // IsUploadResumeSupported returns true if resuming uploads is supported.
  392. // Resuming uploads is not supported on S3
  393. func (*S3Fs) IsUploadResumeSupported() bool {
  394. return false
  395. }
  396. // IsConditionalUploadResumeSupported returns if resuming uploads is supported
  397. // for the specified size
  398. func (*S3Fs) IsConditionalUploadResumeSupported(size int64) bool {
  399. return size <= resumeMaxSize
  400. }
  401. // IsAtomicUploadSupported returns true if atomic upload is supported.
  402. // S3 uploads are already atomic, we don't need to upload to a temporary
  403. // file
  404. func (*S3Fs) IsAtomicUploadSupported() bool {
  405. return false
  406. }
  407. // IsNotExist returns a boolean indicating whether the error is known to
  408. // report that a file or directory does not exist
  409. func (*S3Fs) IsNotExist(err error) bool {
  410. if err == nil {
  411. return false
  412. }
  413. var re *awshttp.ResponseError
  414. if errors.As(err, &re) {
  415. if re.Response != nil {
  416. return re.Response.StatusCode == http.StatusNotFound
  417. }
  418. }
  419. return false
  420. }
  421. // IsPermission returns a boolean indicating whether the error is known to
  422. // report that permission is denied.
  423. func (*S3Fs) IsPermission(err error) bool {
  424. if err == nil {
  425. return false
  426. }
  427. var re *awshttp.ResponseError
  428. if errors.As(err, &re) {
  429. if re.Response != nil {
  430. return re.Response.StatusCode == http.StatusForbidden ||
  431. re.Response.StatusCode == http.StatusUnauthorized
  432. }
  433. }
  434. return false
  435. }
  436. // IsNotSupported returns true if the error indicate an unsupported operation
  437. func (*S3Fs) IsNotSupported(err error) bool {
  438. if err == nil {
  439. return false
  440. }
  441. return errors.Is(err, ErrVfsUnsupported)
  442. }
  443. // CheckRootPath creates the specified local root directory if it does not exists
  444. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  445. // we need a local directory for temporary files
  446. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  447. return osFs.CheckRootPath(username, uid, gid)
  448. }
  449. // ScanRootDirContents returns the number of files contained in the bucket,
  450. // and their size
  451. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  452. return fs.GetDirSize(fs.config.KeyPrefix)
  453. }
  454. // GetDirSize returns the number of files and the size for a folder
  455. // including any subfolders
  456. func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
  457. prefix := fs.getPrefix(dirname)
  458. numFiles := 0
  459. size := int64(0)
  460. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  461. Bucket: aws.String(fs.config.Bucket),
  462. Prefix: aws.String(prefix),
  463. MaxKeys: &s3DefaultPageSize,
  464. })
  465. for paginator.HasMorePages() {
  466. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  467. defer cancelFn()
  468. page, err := paginator.NextPage(ctx)
  469. if err != nil {
  470. metric.S3ListObjectsCompleted(err)
  471. return numFiles, size, err
  472. }
  473. for _, fileObject := range page.Contents {
  474. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  475. objectSize := util.GetIntFromPointer(fileObject.Size)
  476. if isDir && objectSize == 0 {
  477. continue
  478. }
  479. numFiles++
  480. size += objectSize
  481. }
  482. fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size)
  483. }
  484. metric.S3ListObjectsCompleted(nil)
  485. return numFiles, size, nil
  486. }
  487. // GetAtomicUploadPath returns the path to use for an atomic upload.
  488. // S3 uploads are already atomic, we never call this method for S3
  489. func (*S3Fs) GetAtomicUploadPath(_ string) string {
  490. return ""
  491. }
  492. // GetRelativePath returns the path for a file relative to the user's home dir.
  493. // This is the path as seen by SFTPGo users
  494. func (fs *S3Fs) GetRelativePath(name string) string {
  495. rel := path.Clean(name)
  496. if rel == "." {
  497. rel = ""
  498. }
  499. if !path.IsAbs(rel) {
  500. rel = "/" + rel
  501. }
  502. if fs.config.KeyPrefix != "" {
  503. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  504. rel = "/"
  505. }
  506. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  507. }
  508. if fs.mountPath != "" {
  509. rel = path.Join(fs.mountPath, rel)
  510. }
  511. return rel
  512. }
  513. // Walk walks the file tree rooted at root, calling walkFn for each file or
  514. // directory in the tree, including root. The result are unordered
  515. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  516. prefix := fs.getPrefix(root)
  517. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  518. Bucket: aws.String(fs.config.Bucket),
  519. Prefix: aws.String(prefix),
  520. MaxKeys: &s3DefaultPageSize,
  521. })
  522. for paginator.HasMorePages() {
  523. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  524. defer cancelFn()
  525. page, err := paginator.NextPage(ctx)
  526. if err != nil {
  527. metric.S3ListObjectsCompleted(err)
  528. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  529. return err
  530. }
  531. for _, fileObject := range page.Contents {
  532. name, isDir := fs.resolve(fileObject.Key, prefix)
  533. if name == "" {
  534. continue
  535. }
  536. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  537. NewFileInfo(name, isDir, util.GetIntFromPointer(fileObject.Size),
  538. util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  539. if err != nil {
  540. return err
  541. }
  542. }
  543. }
  544. metric.S3ListObjectsCompleted(nil)
  545. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  546. return nil
  547. }
  548. // Join joins any number of path elements into a single path
  549. func (*S3Fs) Join(elem ...string) string {
  550. return strings.TrimPrefix(path.Join(elem...), "/")
  551. }
  552. // HasVirtualFolders returns true if folders are emulated
  553. func (*S3Fs) HasVirtualFolders() bool {
  554. return true
  555. }
  556. // ResolvePath returns the matching filesystem path for the specified virtual path
  557. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  558. if fs.mountPath != "" {
  559. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  560. }
  561. if !path.IsAbs(virtualPath) {
  562. virtualPath = path.Clean("/" + virtualPath)
  563. }
  564. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  565. }
  566. // CopyFile implements the FsFileCopier interface
  567. func (fs *S3Fs) CopyFile(source, target string, srcSize int64) (int, int64, error) {
  568. numFiles := 1
  569. sizeDiff := srcSize
  570. attrs, err := fs.headObject(target)
  571. if err == nil {
  572. sizeDiff -= util.GetIntFromPointer(attrs.ContentLength)
  573. numFiles = 0
  574. } else {
  575. if !fs.IsNotExist(err) {
  576. return 0, 0, err
  577. }
  578. }
  579. if err := fs.copyFileInternal(source, target, srcSize); err != nil {
  580. return 0, 0, err
  581. }
  582. return numFiles, sizeDiff, nil
  583. }
  584. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  585. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  586. isDir := strings.HasSuffix(result, "/")
  587. if isDir {
  588. result = strings.TrimSuffix(result, "/")
  589. }
  590. return result, isDir
  591. }
  592. func (fs *S3Fs) setConfigDefaults() {
  593. if fs.config.UploadPartSize == 0 {
  594. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  595. } else {
  596. if fs.config.UploadPartSize < 1024*1024 {
  597. fs.config.UploadPartSize *= 1024 * 1024
  598. }
  599. }
  600. if fs.config.UploadConcurrency == 0 {
  601. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  602. }
  603. if fs.config.DownloadPartSize == 0 {
  604. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  605. } else {
  606. if fs.config.DownloadPartSize < 1024*1024 {
  607. fs.config.DownloadPartSize *= 1024 * 1024
  608. }
  609. }
  610. if fs.config.DownloadConcurrency == 0 {
  611. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  612. }
  613. }
  614. func (fs *S3Fs) copyFileInternal(source, target string, fileSize int64) error {
  615. contentType := mime.TypeByExtension(path.Ext(source))
  616. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  617. if fileSize > s3CopyObjectThreshold {
  618. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  619. source, fileSize)
  620. err := fs.doMultipartCopy(copySource, target, contentType, fileSize)
  621. metric.S3CopyObjectCompleted(err)
  622. return err
  623. }
  624. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  625. defer cancelFn()
  626. _, err := fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  627. Bucket: aws.String(fs.config.Bucket),
  628. CopySource: aws.String(copySource),
  629. Key: aws.String(target),
  630. StorageClass: types.StorageClass(fs.config.StorageClass),
  631. ACL: types.ObjectCannedACL(fs.config.ACL),
  632. ContentType: util.NilIfEmpty(contentType),
  633. })
  634. metric.S3CopyObjectCompleted(err)
  635. return err
  636. }
  637. func (fs *S3Fs) renameInternal(source, target string, fi os.FileInfo, recursion int) (int, int64, error) {
  638. var numFiles int
  639. var filesSize int64
  640. if fi.IsDir() {
  641. if renameMode == 0 {
  642. hasContents, err := fs.hasContents(source)
  643. if err != nil {
  644. return numFiles, filesSize, err
  645. }
  646. if hasContents {
  647. return numFiles, filesSize, fmt.Errorf("%w: cannot rename non empty directory: %q", ErrVfsUnsupported, source)
  648. }
  649. }
  650. if err := fs.mkdirInternal(target); err != nil {
  651. return numFiles, filesSize, err
  652. }
  653. if renameMode == 1 {
  654. files, size, err := doRecursiveRename(fs, source, target, fs.renameInternal, recursion)
  655. numFiles += files
  656. filesSize += size
  657. if err != nil {
  658. return numFiles, filesSize, err
  659. }
  660. }
  661. } else {
  662. if err := fs.copyFileInternal(source, target, fi.Size()); err != nil {
  663. return numFiles, filesSize, err
  664. }
  665. numFiles++
  666. filesSize += fi.Size()
  667. }
  668. err := fs.Remove(source, fi.IsDir())
  669. if fs.IsNotExist(err) {
  670. err = nil
  671. }
  672. return numFiles, filesSize, err
  673. }
  674. func (fs *S3Fs) mkdirInternal(name string) error {
  675. if !strings.HasSuffix(name, "/") {
  676. name += "/"
  677. }
  678. _, w, _, err := fs.Create(name, -1, 0)
  679. if err != nil {
  680. return err
  681. }
  682. return w.Close()
  683. }
  684. func (fs *S3Fs) hasContents(name string) (bool, error) {
  685. prefix := fs.getPrefix(name)
  686. maxKeys := int32(2)
  687. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  688. Bucket: aws.String(fs.config.Bucket),
  689. Prefix: aws.String(prefix),
  690. MaxKeys: &maxKeys,
  691. })
  692. if paginator.HasMorePages() {
  693. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  694. defer cancelFn()
  695. page, err := paginator.NextPage(ctx)
  696. metric.S3ListObjectsCompleted(err)
  697. if err != nil {
  698. return false, err
  699. }
  700. for _, obj := range page.Contents {
  701. name, _ := fs.resolve(obj.Key, prefix)
  702. if name == "" || name == "/" {
  703. continue
  704. }
  705. return true, nil
  706. }
  707. return false, nil
  708. }
  709. metric.S3ListObjectsCompleted(nil)
  710. return false, nil
  711. }
  712. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  713. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  714. defer cancelFn()
  715. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  716. Bucket: aws.String(fs.config.Bucket),
  717. Key: aws.String(target),
  718. StorageClass: types.StorageClass(fs.config.StorageClass),
  719. ACL: types.ObjectCannedACL(fs.config.ACL),
  720. ContentType: util.NilIfEmpty(contentType),
  721. })
  722. if err != nil {
  723. return fmt.Errorf("unable to create multipart copy request: %w", err)
  724. }
  725. uploadID := util.GetStringFromPointer(res.UploadId)
  726. if uploadID == "" {
  727. return errors.New("unable to get multipart copy upload ID")
  728. }
  729. // We use 32 MB part size and copy 10 parts in parallel.
  730. // These values are arbitrary. We don't want to start too many goroutines
  731. maxPartSize := int64(32 * 1024 * 1024)
  732. if fileSize > int64(100*1024*1024*1024) {
  733. maxPartSize = int64(500 * 1024 * 1024)
  734. }
  735. guard := make(chan struct{}, 10)
  736. finished := false
  737. var completedParts []types.CompletedPart
  738. var partMutex sync.Mutex
  739. var wg sync.WaitGroup
  740. var hasError atomic.Bool
  741. var errOnce sync.Once
  742. var copyError error
  743. var partNumber int32
  744. var offset int64
  745. opCtx, opCancel := context.WithCancel(context.Background())
  746. defer opCancel()
  747. for partNumber = 1; !finished; partNumber++ {
  748. start := offset
  749. end := offset + maxPartSize
  750. if end >= fileSize {
  751. end = fileSize
  752. finished = true
  753. }
  754. offset = end
  755. guard <- struct{}{}
  756. if hasError.Load() {
  757. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  758. break
  759. }
  760. wg.Add(1)
  761. go func(partNum int32, partStart, partEnd int64) {
  762. defer func() {
  763. <-guard
  764. wg.Done()
  765. }()
  766. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  767. defer innerCancelFn()
  768. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  769. Bucket: aws.String(fs.config.Bucket),
  770. CopySource: aws.String(source),
  771. Key: aws.String(target),
  772. PartNumber: &partNum,
  773. UploadId: aws.String(uploadID),
  774. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  775. })
  776. if err != nil {
  777. errOnce.Do(func() {
  778. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  779. hasError.Store(true)
  780. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  781. opCancel()
  782. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  783. defer abortCancelFn()
  784. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  785. Bucket: aws.String(fs.config.Bucket),
  786. Key: aws.String(target),
  787. UploadId: aws.String(uploadID),
  788. })
  789. if errAbort != nil {
  790. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  791. }
  792. })
  793. return
  794. }
  795. partMutex.Lock()
  796. completedParts = append(completedParts, types.CompletedPart{
  797. ETag: partResp.CopyPartResult.ETag,
  798. PartNumber: &partNum,
  799. })
  800. partMutex.Unlock()
  801. }(partNumber, start, end)
  802. }
  803. wg.Wait()
  804. close(guard)
  805. if copyError != nil {
  806. return copyError
  807. }
  808. sort.Slice(completedParts, func(i, j int) bool {
  809. getPartNumber := func(number *int32) int32 {
  810. if number == nil {
  811. return 0
  812. }
  813. return *number
  814. }
  815. return getPartNumber(completedParts[i].PartNumber) < getPartNumber(completedParts[j].PartNumber)
  816. })
  817. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  818. defer completeCancelFn()
  819. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  820. Bucket: aws.String(fs.config.Bucket),
  821. Key: aws.String(target),
  822. UploadId: aws.String(uploadID),
  823. MultipartUpload: &types.CompletedMultipartUpload{
  824. Parts: completedParts,
  825. },
  826. })
  827. if err != nil {
  828. return fmt.Errorf("unable to complete multipart upload: %w", err)
  829. }
  830. return nil
  831. }
  832. func (fs *S3Fs) getPrefix(name string) string {
  833. prefix := ""
  834. if name != "" && name != "." && name != "/" {
  835. prefix = strings.TrimPrefix(name, "/")
  836. if !strings.HasSuffix(prefix, "/") {
  837. prefix += "/"
  838. }
  839. }
  840. return prefix
  841. }
  842. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  843. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  844. defer cancelFn()
  845. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  846. Bucket: aws.String(fs.config.Bucket),
  847. Key: aws.String(name),
  848. })
  849. metric.S3HeadObjectCompleted(err)
  850. return obj, err
  851. }
  852. // GetMimeType returns the content type
  853. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  854. obj, err := fs.headObject(name)
  855. if err != nil {
  856. return "", err
  857. }
  858. return util.GetStringFromPointer(obj.ContentType), nil
  859. }
  860. // Close closes the fs
  861. func (*S3Fs) Close() error {
  862. return nil
  863. }
  864. // GetAvailableDiskSize returns the available size for the specified path
  865. func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  866. return nil, ErrStorageSizeUnavailable
  867. }
  868. func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) (int64, error) {
  869. fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
  870. ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
  871. defer cancelFn()
  872. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  873. d.Concurrency = fs.config.DownloadConcurrency
  874. d.PartSize = fs.config.DownloadPartSize
  875. if fs.config.DownloadPartMaxTime > 0 {
  876. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  877. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  878. fs.config.SkipTLSVerify)
  879. })
  880. }
  881. })
  882. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  883. Bucket: aws.String(fs.config.Bucket),
  884. Key: aws.String(name),
  885. })
  886. fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
  887. name, n, err)
  888. metric.S3TransferCompleted(n, 1, err)
  889. return n, err
  890. }
  891. type s3DirLister struct {
  892. baseDirLister
  893. paginator *s3.ListObjectsV2Paginator
  894. timeout time.Duration
  895. prefix string
  896. prefixes map[string]bool
  897. metricUpdated bool
  898. }
  899. func (l *s3DirLister) resolve(name *string) (string, bool) {
  900. result := strings.TrimPrefix(util.GetStringFromPointer(name), l.prefix)
  901. isDir := strings.HasSuffix(result, "/")
  902. if isDir {
  903. result = strings.TrimSuffix(result, "/")
  904. }
  905. return result, isDir
  906. }
  907. func (l *s3DirLister) Next(limit int) ([]os.FileInfo, error) {
  908. if limit <= 0 {
  909. return nil, errInvalidDirListerLimit
  910. }
  911. if len(l.cache) >= limit {
  912. return l.returnFromCache(limit), nil
  913. }
  914. if !l.paginator.HasMorePages() {
  915. if !l.metricUpdated {
  916. l.metricUpdated = true
  917. metric.S3ListObjectsCompleted(nil)
  918. }
  919. return l.returnFromCache(limit), io.EOF
  920. }
  921. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(l.timeout))
  922. defer cancelFn()
  923. page, err := l.paginator.NextPage(ctx)
  924. if err != nil {
  925. metric.S3ListObjectsCompleted(err)
  926. return l.cache, err
  927. }
  928. for _, p := range page.CommonPrefixes {
  929. // prefixes have a trailing slash
  930. name, _ := l.resolve(p.Prefix)
  931. if name == "" {
  932. continue
  933. }
  934. if _, ok := l.prefixes[name]; ok {
  935. continue
  936. }
  937. l.cache = append(l.cache, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  938. l.prefixes[name] = true
  939. }
  940. for _, fileObject := range page.Contents {
  941. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  942. objectSize := util.GetIntFromPointer(fileObject.Size)
  943. name, isDir := l.resolve(fileObject.Key)
  944. if name == "" || name == "/" {
  945. continue
  946. }
  947. if isDir {
  948. if _, ok := l.prefixes[name]; ok {
  949. continue
  950. }
  951. l.prefixes[name] = true
  952. }
  953. l.cache = append(l.cache, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  954. }
  955. return l.returnFromCache(limit), nil
  956. }
  957. func (l *s3DirLister) Close() error {
  958. return l.baseDirLister.Close()
  959. }
  960. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration, skipTLSVerify bool) *awshttp.BuildableClient {
  961. c := awshttp.NewBuildableClient().
  962. WithDialerOptions(func(d *net.Dialer) {
  963. d.Timeout = 8 * time.Second
  964. }).
  965. WithTransportOptions(func(tr *http.Transport) {
  966. tr.IdleConnTimeout = idleConnectionTimeout
  967. tr.WriteBufferSize = s3TransferBufferSize
  968. tr.ReadBufferSize = s3TransferBufferSize
  969. if skipTLSVerify {
  970. if tr.TLSClientConfig != nil {
  971. tr.TLSClientConfig.InsecureSkipVerify = skipTLSVerify
  972. } else {
  973. tr.TLSClientConfig = &tls.Config{
  974. MinVersion: awshttp.DefaultHTTPTransportTLSMinVersion,
  975. InsecureSkipVerify: skipTLSVerify,
  976. }
  977. }
  978. }
  979. })
  980. if timeout > 0 {
  981. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  982. }
  983. return c
  984. }
  985. // ideally we should simply use url.PathEscape:
  986. //
  987. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  988. //
  989. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  990. func pathEscape(in string) string {
  991. var u url.URL
  992. u.Path = in
  993. return strings.ReplaceAll(u.String(), "+", "%2B")
  994. }