s3fs.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "crypto/tls"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "net"
  25. "net/http"
  26. "net/url"
  27. "os"
  28. "path"
  29. "path/filepath"
  30. "sort"
  31. "strings"
  32. "sync"
  33. "sync/atomic"
  34. "time"
  35. "github.com/aws/aws-sdk-go-v2/aws"
  36. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  37. "github.com/aws/aws-sdk-go-v2/config"
  38. "github.com/aws/aws-sdk-go-v2/credentials"
  39. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  40. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  41. "github.com/aws/aws-sdk-go-v2/service/s3"
  42. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  43. "github.com/aws/aws-sdk-go-v2/service/sts"
  44. "github.com/eikenb/pipeat"
  45. "github.com/pkg/sftp"
  46. "github.com/drakkan/sftpgo/v2/internal/logger"
  47. "github.com/drakkan/sftpgo/v2/internal/metric"
  48. "github.com/drakkan/sftpgo/v2/internal/util"
  49. "github.com/drakkan/sftpgo/v2/internal/version"
  50. )
  51. const (
  52. // using this mime type for directories improves compatibility with s3fs-fuse
  53. s3DirMimeType = "application/x-directory"
  54. s3TransferBufferSize = 256 * 1024
  55. )
  56. var (
  57. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  58. s3DefaultPageSize = int32(5000)
  59. )
  60. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  61. type S3Fs struct {
  62. connectionID string
  63. localTempDir string
  64. // if not empty this fs is mouted as virtual folder in the specified path
  65. mountPath string
  66. config *S3FsConfig
  67. svc *s3.Client
  68. ctxTimeout time.Duration
  69. }
  70. func init() {
  71. version.AddFeature("+s3")
  72. }
  73. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  74. // object storage
  75. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  76. if localTempDir == "" {
  77. localTempDir = getLocalTempDir()
  78. }
  79. fs := &S3Fs{
  80. connectionID: connectionID,
  81. localTempDir: localTempDir,
  82. mountPath: getMountPath(mountPath),
  83. config: &s3Config,
  84. ctxTimeout: 30 * time.Second,
  85. }
  86. if err := fs.config.validate(); err != nil {
  87. return fs, err
  88. }
  89. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  90. defer cancel()
  91. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(
  92. getAWSHTTPClient(0, 30*time.Second, fs.config.SkipTLSVerify)),
  93. )
  94. if err != nil {
  95. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  96. }
  97. if fs.config.Region != "" {
  98. awsConfig.Region = fs.config.Region
  99. }
  100. if !fs.config.AccessSecret.IsEmpty() {
  101. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  102. return fs, err
  103. }
  104. awsConfig.Credentials = aws.NewCredentialsCache(
  105. credentials.NewStaticCredentialsProvider(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), ""))
  106. }
  107. fs.setConfigDefaults()
  108. if fs.config.RoleARN != "" {
  109. client := sts.NewFromConfig(awsConfig)
  110. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  111. awsConfig.Credentials = creds
  112. }
  113. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  114. o.AppID = fmt.Sprintf("SFTPGo-%s", version.Get().CommitHash)
  115. o.UsePathStyle = fs.config.ForcePathStyle
  116. if fs.config.Endpoint != "" {
  117. o.BaseEndpoint = aws.String(fs.config.Endpoint)
  118. }
  119. })
  120. return fs, nil
  121. }
  122. // Name returns the name for the Fs implementation
  123. func (fs *S3Fs) Name() string {
  124. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  125. }
  126. // ConnectionID returns the connection ID associated to this Fs implementation
  127. func (fs *S3Fs) ConnectionID() string {
  128. return fs.connectionID
  129. }
  130. // Stat returns a FileInfo describing the named file
  131. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  132. var result *FileInfo
  133. if name == "" || name == "/" || name == "." {
  134. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  135. }
  136. if fs.config.KeyPrefix == name+"/" {
  137. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  138. }
  139. obj, err := fs.headObject(name)
  140. if err == nil {
  141. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  142. // So we check some common content types to detect if this is a "directory".
  143. isDir := util.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  144. if util.GetIntFromPointer(obj.ContentLength) == 0 && !isDir {
  145. _, err = fs.headObject(name + "/")
  146. isDir = err == nil
  147. }
  148. return NewFileInfo(name, isDir, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false), nil
  149. }
  150. if !fs.IsNotExist(err) {
  151. return result, err
  152. }
  153. // now check if this is a prefix (virtual directory)
  154. hasContents, err := fs.hasContents(name)
  155. if err == nil && hasContents {
  156. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  157. } else if err != nil {
  158. return nil, err
  159. }
  160. // the requested file may still be a directory as a zero bytes key
  161. // with a trailing forward slash (created using mkdir).
  162. // S3 doesn't return content type when listing objects, so we have
  163. // create "dirs" adding a trailing "/" to the key
  164. return fs.getStatForDir(name)
  165. }
  166. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  167. var result *FileInfo
  168. obj, err := fs.headObject(name + "/")
  169. if err != nil {
  170. return result, err
  171. }
  172. return NewFileInfo(name, true, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false), nil
  173. }
  174. // Lstat returns a FileInfo describing the named file
  175. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  176. return fs.Stat(name)
  177. }
  178. // Open opens the named file for reading
  179. func (fs *S3Fs) Open(name string, offset int64) (File, PipeReader, func(), error) {
  180. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  181. if err != nil {
  182. return nil, nil, nil, err
  183. }
  184. p := NewPipeReader(r)
  185. if readMetadata > 0 {
  186. attrs, err := fs.headObject(name)
  187. if err != nil {
  188. r.Close()
  189. w.Close()
  190. return nil, nil, nil, err
  191. }
  192. p.setMetadata(attrs.Metadata)
  193. }
  194. ctx, cancelFn := context.WithCancel(context.Background())
  195. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  196. d.Concurrency = fs.config.DownloadConcurrency
  197. d.PartSize = fs.config.DownloadPartSize
  198. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  199. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  200. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  201. fs.config.SkipTLSVerify)
  202. })
  203. }
  204. })
  205. var streamRange *string
  206. if offset > 0 {
  207. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  208. }
  209. go func() {
  210. defer cancelFn()
  211. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  212. Bucket: aws.String(fs.config.Bucket),
  213. Key: aws.String(name),
  214. Range: streamRange,
  215. })
  216. w.CloseWithError(err) //nolint:errcheck
  217. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  218. metric.S3TransferCompleted(n, 1, err)
  219. }()
  220. return nil, p, cancelFn, nil
  221. }
  222. // Create creates or opens the named file for writing
  223. func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
  224. if checks&CheckParentDir != 0 {
  225. _, err := fs.Stat(path.Dir(name))
  226. if err != nil {
  227. return nil, nil, nil, err
  228. }
  229. }
  230. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  231. if err != nil {
  232. return nil, nil, nil, err
  233. }
  234. var p PipeWriter
  235. if checks&CheckResume != 0 {
  236. p = newPipeWriterAtOffset(w, 0)
  237. } else {
  238. p = NewPipeWriter(w)
  239. }
  240. ctx, cancelFn := context.WithCancel(context.Background())
  241. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  242. u.Concurrency = fs.config.UploadConcurrency
  243. u.PartSize = fs.config.UploadPartSize
  244. if fs.config.UploadPartMaxTime > 0 {
  245. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  246. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond,
  247. fs.config.SkipTLSVerify)
  248. })
  249. }
  250. })
  251. go func() {
  252. defer cancelFn()
  253. var contentType string
  254. if flag == -1 {
  255. contentType = s3DirMimeType
  256. } else {
  257. contentType = mime.TypeByExtension(path.Ext(name))
  258. }
  259. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  260. Bucket: aws.String(fs.config.Bucket),
  261. Key: aws.String(name),
  262. Body: r,
  263. ACL: types.ObjectCannedACL(fs.config.ACL),
  264. StorageClass: types.StorageClass(fs.config.StorageClass),
  265. ContentType: util.NilIfEmpty(contentType),
  266. })
  267. r.CloseWithError(err) //nolint:errcheck
  268. p.Done(err)
  269. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v",
  270. name, fs.config.ACL, r.GetReadedBytes(), err)
  271. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  272. }()
  273. if checks&CheckResume != 0 {
  274. readCh := make(chan error, 1)
  275. go func() {
  276. n, err := fs.downloadToWriter(name, p)
  277. pw := p.(*pipeWriterAtOffset)
  278. pw.offset = 0
  279. pw.writeOffset = n
  280. readCh <- err
  281. }()
  282. err = <-readCh
  283. if err != nil {
  284. cancelFn()
  285. p.Close()
  286. fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
  287. return nil, nil, nil, err
  288. }
  289. }
  290. if uploadMode&4 != 0 {
  291. return nil, p, nil, nil
  292. }
  293. return nil, p, cancelFn, nil
  294. }
  295. // Rename renames (moves) source to target.
  296. func (fs *S3Fs) Rename(source, target string) (int, int64, error) {
  297. if source == target {
  298. return -1, -1, nil
  299. }
  300. _, err := fs.Stat(path.Dir(target))
  301. if err != nil {
  302. return -1, -1, err
  303. }
  304. fi, err := fs.Stat(source)
  305. if err != nil {
  306. return -1, -1, err
  307. }
  308. return fs.renameInternal(source, target, fi, 0)
  309. }
  310. // Remove removes the named file or (empty) directory.
  311. func (fs *S3Fs) Remove(name string, isDir bool) error {
  312. if isDir {
  313. hasContents, err := fs.hasContents(name)
  314. if err != nil {
  315. return err
  316. }
  317. if hasContents {
  318. return fmt.Errorf("cannot remove non empty directory: %q", name)
  319. }
  320. if !strings.HasSuffix(name, "/") {
  321. name += "/"
  322. }
  323. }
  324. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  325. defer cancelFn()
  326. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  327. Bucket: aws.String(fs.config.Bucket),
  328. Key: aws.String(name),
  329. })
  330. metric.S3DeleteObjectCompleted(err)
  331. return err
  332. }
  333. // Mkdir creates a new directory with the specified name and default permissions
  334. func (fs *S3Fs) Mkdir(name string) error {
  335. _, err := fs.Stat(name)
  336. if !fs.IsNotExist(err) {
  337. return err
  338. }
  339. return fs.mkdirInternal(name)
  340. }
  341. // Symlink creates source as a symbolic link to target.
  342. func (*S3Fs) Symlink(_, _ string) error {
  343. return ErrVfsUnsupported
  344. }
  345. // Readlink returns the destination of the named symbolic link
  346. func (*S3Fs) Readlink(_ string) (string, error) {
  347. return "", ErrVfsUnsupported
  348. }
  349. // Chown changes the numeric uid and gid of the named file.
  350. func (*S3Fs) Chown(_ string, _ int, _ int) error {
  351. return ErrVfsUnsupported
  352. }
  353. // Chmod changes the mode of the named file to mode.
  354. func (*S3Fs) Chmod(_ string, _ os.FileMode) error {
  355. return ErrVfsUnsupported
  356. }
  357. // Chtimes changes the access and modification times of the named file.
  358. func (fs *S3Fs) Chtimes(_ string, _, _ time.Time, _ bool) error {
  359. return ErrVfsUnsupported
  360. }
  361. // Truncate changes the size of the named file.
  362. // Truncate by path is not supported, while truncating an opened
  363. // file is handled inside base transfer
  364. func (*S3Fs) Truncate(_ string, _ int64) error {
  365. return ErrVfsUnsupported
  366. }
  367. // ReadDir reads the directory named by dirname and returns
  368. // a list of directory entries.
  369. func (fs *S3Fs) ReadDir(dirname string) (DirLister, error) {
  370. // dirname must be already cleaned
  371. prefix := fs.getPrefix(dirname)
  372. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  373. Bucket: aws.String(fs.config.Bucket),
  374. Prefix: aws.String(prefix),
  375. Delimiter: aws.String("/"),
  376. MaxKeys: &s3DefaultPageSize,
  377. })
  378. return &s3DirLister{
  379. paginator: paginator,
  380. timeout: fs.ctxTimeout,
  381. prefix: prefix,
  382. prefixes: make(map[string]bool),
  383. }, nil
  384. }
  385. // IsUploadResumeSupported returns true if resuming uploads is supported.
  386. // Resuming uploads is not supported on S3
  387. func (*S3Fs) IsUploadResumeSupported() bool {
  388. return false
  389. }
  390. // IsConditionalUploadResumeSupported returns if resuming uploads is supported
  391. // for the specified size
  392. func (*S3Fs) IsConditionalUploadResumeSupported(size int64) bool {
  393. return size <= resumeMaxSize
  394. }
  395. // IsAtomicUploadSupported returns true if atomic upload is supported.
  396. // S3 uploads are already atomic, we don't need to upload to a temporary
  397. // file
  398. func (*S3Fs) IsAtomicUploadSupported() bool {
  399. return false
  400. }
  401. // IsNotExist returns a boolean indicating whether the error is known to
  402. // report that a file or directory does not exist
  403. func (*S3Fs) IsNotExist(err error) bool {
  404. if err == nil {
  405. return false
  406. }
  407. var re *awshttp.ResponseError
  408. if errors.As(err, &re) {
  409. if re.Response != nil {
  410. return re.Response.StatusCode == http.StatusNotFound
  411. }
  412. }
  413. return false
  414. }
  415. // IsPermission returns a boolean indicating whether the error is known to
  416. // report that permission is denied.
  417. func (*S3Fs) IsPermission(err error) bool {
  418. if err == nil {
  419. return false
  420. }
  421. var re *awshttp.ResponseError
  422. if errors.As(err, &re) {
  423. if re.Response != nil {
  424. return re.Response.StatusCode == http.StatusForbidden ||
  425. re.Response.StatusCode == http.StatusUnauthorized
  426. }
  427. }
  428. return false
  429. }
  430. // IsNotSupported returns true if the error indicate an unsupported operation
  431. func (*S3Fs) IsNotSupported(err error) bool {
  432. if err == nil {
  433. return false
  434. }
  435. return err == ErrVfsUnsupported
  436. }
  437. // CheckRootPath creates the specified local root directory if it does not exists
  438. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  439. // we need a local directory for temporary files
  440. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  441. return osFs.CheckRootPath(username, uid, gid)
  442. }
  443. // ScanRootDirContents returns the number of files contained in the bucket,
  444. // and their size
  445. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  446. return fs.GetDirSize(fs.config.KeyPrefix)
  447. }
  448. // GetDirSize returns the number of files and the size for a folder
  449. // including any subfolders
  450. func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
  451. prefix := fs.getPrefix(dirname)
  452. numFiles := 0
  453. size := int64(0)
  454. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  455. Bucket: aws.String(fs.config.Bucket),
  456. Prefix: aws.String(prefix),
  457. MaxKeys: &s3DefaultPageSize,
  458. })
  459. for paginator.HasMorePages() {
  460. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  461. defer cancelFn()
  462. page, err := paginator.NextPage(ctx)
  463. if err != nil {
  464. metric.S3ListObjectsCompleted(err)
  465. return numFiles, size, err
  466. }
  467. for _, fileObject := range page.Contents {
  468. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  469. objectSize := util.GetIntFromPointer(fileObject.Size)
  470. if isDir && objectSize == 0 {
  471. continue
  472. }
  473. numFiles++
  474. size += objectSize
  475. if numFiles%1000 == 0 {
  476. fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size)
  477. }
  478. }
  479. }
  480. metric.S3ListObjectsCompleted(nil)
  481. return numFiles, size, nil
  482. }
  483. // GetAtomicUploadPath returns the path to use for an atomic upload.
  484. // S3 uploads are already atomic, we never call this method for S3
  485. func (*S3Fs) GetAtomicUploadPath(_ string) string {
  486. return ""
  487. }
  488. // GetRelativePath returns the path for a file relative to the user's home dir.
  489. // This is the path as seen by SFTPGo users
  490. func (fs *S3Fs) GetRelativePath(name string) string {
  491. rel := path.Clean(name)
  492. if rel == "." {
  493. rel = ""
  494. }
  495. if !path.IsAbs(rel) {
  496. rel = "/" + rel
  497. }
  498. if fs.config.KeyPrefix != "" {
  499. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  500. rel = "/"
  501. }
  502. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  503. }
  504. if fs.mountPath != "" {
  505. rel = path.Join(fs.mountPath, rel)
  506. }
  507. return rel
  508. }
  509. // Walk walks the file tree rooted at root, calling walkFn for each file or
  510. // directory in the tree, including root. The result are unordered
  511. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  512. prefix := fs.getPrefix(root)
  513. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  514. Bucket: aws.String(fs.config.Bucket),
  515. Prefix: aws.String(prefix),
  516. MaxKeys: &s3DefaultPageSize,
  517. })
  518. for paginator.HasMorePages() {
  519. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  520. defer cancelFn()
  521. page, err := paginator.NextPage(ctx)
  522. if err != nil {
  523. metric.S3ListObjectsCompleted(err)
  524. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  525. return err
  526. }
  527. for _, fileObject := range page.Contents {
  528. name, isDir := fs.resolve(fileObject.Key, prefix)
  529. if name == "" {
  530. continue
  531. }
  532. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  533. NewFileInfo(name, isDir, util.GetIntFromPointer(fileObject.Size),
  534. util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  535. if err != nil {
  536. return err
  537. }
  538. }
  539. }
  540. metric.S3ListObjectsCompleted(nil)
  541. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  542. return nil
  543. }
  544. // Join joins any number of path elements into a single path
  545. func (*S3Fs) Join(elem ...string) string {
  546. return strings.TrimPrefix(path.Join(elem...), "/")
  547. }
  548. // HasVirtualFolders returns true if folders are emulated
  549. func (*S3Fs) HasVirtualFolders() bool {
  550. return true
  551. }
  552. // ResolvePath returns the matching filesystem path for the specified virtual path
  553. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  554. if fs.mountPath != "" {
  555. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  556. }
  557. if !path.IsAbs(virtualPath) {
  558. virtualPath = path.Clean("/" + virtualPath)
  559. }
  560. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  561. }
  562. // CopyFile implements the FsFileCopier interface
  563. func (fs *S3Fs) CopyFile(source, target string, srcSize int64) error {
  564. return fs.copyFileInternal(source, target, srcSize)
  565. }
  566. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  567. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  568. isDir := strings.HasSuffix(result, "/")
  569. if isDir {
  570. result = strings.TrimSuffix(result, "/")
  571. }
  572. return result, isDir
  573. }
  574. func (fs *S3Fs) setConfigDefaults() {
  575. if fs.config.UploadPartSize == 0 {
  576. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  577. } else {
  578. if fs.config.UploadPartSize < 1024*1024 {
  579. fs.config.UploadPartSize *= 1024 * 1024
  580. }
  581. }
  582. if fs.config.UploadConcurrency == 0 {
  583. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  584. }
  585. if fs.config.DownloadPartSize == 0 {
  586. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  587. } else {
  588. if fs.config.DownloadPartSize < 1024*1024 {
  589. fs.config.DownloadPartSize *= 1024 * 1024
  590. }
  591. }
  592. if fs.config.DownloadConcurrency == 0 {
  593. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  594. }
  595. }
  596. func (fs *S3Fs) copyFileInternal(source, target string, fileSize int64) error {
  597. contentType := mime.TypeByExtension(path.Ext(source))
  598. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  599. if fileSize > 500*1024*1024 {
  600. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  601. source, fileSize)
  602. err := fs.doMultipartCopy(copySource, target, contentType, fileSize)
  603. metric.S3CopyObjectCompleted(err)
  604. return err
  605. }
  606. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  607. defer cancelFn()
  608. _, err := fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  609. Bucket: aws.String(fs.config.Bucket),
  610. CopySource: aws.String(copySource),
  611. Key: aws.String(target),
  612. StorageClass: types.StorageClass(fs.config.StorageClass),
  613. ACL: types.ObjectCannedACL(fs.config.ACL),
  614. ContentType: util.NilIfEmpty(contentType),
  615. })
  616. metric.S3CopyObjectCompleted(err)
  617. return err
  618. }
  619. func (fs *S3Fs) renameInternal(source, target string, fi os.FileInfo, recursion int) (int, int64, error) {
  620. var numFiles int
  621. var filesSize int64
  622. if fi.IsDir() {
  623. if renameMode == 0 {
  624. hasContents, err := fs.hasContents(source)
  625. if err != nil {
  626. return numFiles, filesSize, err
  627. }
  628. if hasContents {
  629. return numFiles, filesSize, fmt.Errorf("cannot rename non empty directory: %q", source)
  630. }
  631. }
  632. if err := fs.mkdirInternal(target); err != nil {
  633. return numFiles, filesSize, err
  634. }
  635. if renameMode == 1 {
  636. files, size, err := doRecursiveRename(fs, source, target, fs.renameInternal, recursion)
  637. numFiles += files
  638. filesSize += size
  639. if err != nil {
  640. return numFiles, filesSize, err
  641. }
  642. }
  643. } else {
  644. if err := fs.copyFileInternal(source, target, fi.Size()); err != nil {
  645. return numFiles, filesSize, err
  646. }
  647. numFiles++
  648. filesSize += fi.Size()
  649. }
  650. err := fs.Remove(source, fi.IsDir())
  651. if fs.IsNotExist(err) {
  652. err = nil
  653. }
  654. return numFiles, filesSize, err
  655. }
  656. func (fs *S3Fs) mkdirInternal(name string) error {
  657. if !strings.HasSuffix(name, "/") {
  658. name += "/"
  659. }
  660. _, w, _, err := fs.Create(name, -1, 0)
  661. if err != nil {
  662. return err
  663. }
  664. return w.Close()
  665. }
  666. func (fs *S3Fs) hasContents(name string) (bool, error) {
  667. prefix := fs.getPrefix(name)
  668. maxKeys := int32(2)
  669. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  670. Bucket: aws.String(fs.config.Bucket),
  671. Prefix: aws.String(prefix),
  672. MaxKeys: &maxKeys,
  673. })
  674. if paginator.HasMorePages() {
  675. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  676. defer cancelFn()
  677. page, err := paginator.NextPage(ctx)
  678. metric.S3ListObjectsCompleted(err)
  679. if err != nil {
  680. return false, err
  681. }
  682. for _, obj := range page.Contents {
  683. name, _ := fs.resolve(obj.Key, prefix)
  684. if name == "" || name == "/" {
  685. continue
  686. }
  687. return true, nil
  688. }
  689. return false, nil
  690. }
  691. metric.S3ListObjectsCompleted(nil)
  692. return false, nil
  693. }
  694. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  695. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  696. defer cancelFn()
  697. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  698. Bucket: aws.String(fs.config.Bucket),
  699. Key: aws.String(target),
  700. StorageClass: types.StorageClass(fs.config.StorageClass),
  701. ACL: types.ObjectCannedACL(fs.config.ACL),
  702. ContentType: util.NilIfEmpty(contentType),
  703. })
  704. if err != nil {
  705. return fmt.Errorf("unable to create multipart copy request: %w", err)
  706. }
  707. uploadID := util.GetStringFromPointer(res.UploadId)
  708. if uploadID == "" {
  709. return errors.New("unable to get multipart copy upload ID")
  710. }
  711. // We use 32 MB part size and copy 10 parts in parallel.
  712. // These values are arbitrary. We don't want to start too many goroutines
  713. maxPartSize := int64(32 * 1024 * 1024)
  714. if fileSize > int64(100*1024*1024*1024) {
  715. maxPartSize = int64(500 * 1024 * 1024)
  716. }
  717. guard := make(chan struct{}, 10)
  718. finished := false
  719. var completedParts []types.CompletedPart
  720. var partMutex sync.Mutex
  721. var wg sync.WaitGroup
  722. var hasError atomic.Bool
  723. var errOnce sync.Once
  724. var copyError error
  725. var partNumber int32
  726. var offset int64
  727. opCtx, opCancel := context.WithCancel(context.Background())
  728. defer opCancel()
  729. for partNumber = 1; !finished; partNumber++ {
  730. start := offset
  731. end := offset + maxPartSize
  732. if end >= fileSize {
  733. end = fileSize
  734. finished = true
  735. }
  736. offset = end
  737. guard <- struct{}{}
  738. if hasError.Load() {
  739. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  740. break
  741. }
  742. wg.Add(1)
  743. go func(partNum int32, partStart, partEnd int64) {
  744. defer func() {
  745. <-guard
  746. wg.Done()
  747. }()
  748. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  749. defer innerCancelFn()
  750. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  751. Bucket: aws.String(fs.config.Bucket),
  752. CopySource: aws.String(source),
  753. Key: aws.String(target),
  754. PartNumber: &partNum,
  755. UploadId: aws.String(uploadID),
  756. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  757. })
  758. if err != nil {
  759. errOnce.Do(func() {
  760. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  761. hasError.Store(true)
  762. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  763. opCancel()
  764. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  765. defer abortCancelFn()
  766. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  767. Bucket: aws.String(fs.config.Bucket),
  768. Key: aws.String(target),
  769. UploadId: aws.String(uploadID),
  770. })
  771. if errAbort != nil {
  772. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  773. }
  774. })
  775. return
  776. }
  777. partMutex.Lock()
  778. completedParts = append(completedParts, types.CompletedPart{
  779. ETag: partResp.CopyPartResult.ETag,
  780. PartNumber: &partNum,
  781. })
  782. partMutex.Unlock()
  783. }(partNumber, start, end)
  784. }
  785. wg.Wait()
  786. close(guard)
  787. if copyError != nil {
  788. return copyError
  789. }
  790. sort.Slice(completedParts, func(i, j int) bool {
  791. getPartNumber := func(number *int32) int32 {
  792. if number == nil {
  793. return 0
  794. }
  795. return *number
  796. }
  797. return getPartNumber(completedParts[i].PartNumber) < getPartNumber(completedParts[j].PartNumber)
  798. })
  799. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  800. defer completeCancelFn()
  801. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  802. Bucket: aws.String(fs.config.Bucket),
  803. Key: aws.String(target),
  804. UploadId: aws.String(uploadID),
  805. MultipartUpload: &types.CompletedMultipartUpload{
  806. Parts: completedParts,
  807. },
  808. })
  809. if err != nil {
  810. return fmt.Errorf("unable to complete multipart upload: %w", err)
  811. }
  812. return nil
  813. }
  814. func (fs *S3Fs) getPrefix(name string) string {
  815. prefix := ""
  816. if name != "" && name != "." && name != "/" {
  817. prefix = strings.TrimPrefix(name, "/")
  818. if !strings.HasSuffix(prefix, "/") {
  819. prefix += "/"
  820. }
  821. }
  822. return prefix
  823. }
  824. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  825. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  826. defer cancelFn()
  827. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  828. Bucket: aws.String(fs.config.Bucket),
  829. Key: aws.String(name),
  830. })
  831. metric.S3HeadObjectCompleted(err)
  832. return obj, err
  833. }
  834. // GetMimeType returns the content type
  835. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  836. obj, err := fs.headObject(name)
  837. if err != nil {
  838. return "", err
  839. }
  840. return util.GetStringFromPointer(obj.ContentType), nil
  841. }
  842. // Close closes the fs
  843. func (*S3Fs) Close() error {
  844. return nil
  845. }
  846. // GetAvailableDiskSize returns the available size for the specified path
  847. func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  848. return nil, ErrStorageSizeUnavailable
  849. }
  850. func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) (int64, error) {
  851. fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
  852. ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
  853. defer cancelFn()
  854. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  855. d.Concurrency = fs.config.DownloadConcurrency
  856. d.PartSize = fs.config.DownloadPartSize
  857. if fs.config.DownloadPartMaxTime > 0 {
  858. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  859. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  860. fs.config.SkipTLSVerify)
  861. })
  862. }
  863. })
  864. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  865. Bucket: aws.String(fs.config.Bucket),
  866. Key: aws.String(name),
  867. })
  868. fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
  869. name, n, err)
  870. metric.S3TransferCompleted(n, 1, err)
  871. return n, err
  872. }
  873. type s3DirLister struct {
  874. baseDirLister
  875. paginator *s3.ListObjectsV2Paginator
  876. timeout time.Duration
  877. prefix string
  878. prefixes map[string]bool
  879. metricUpdated bool
  880. }
  881. func (l *s3DirLister) resolve(name *string) (string, bool) {
  882. result := strings.TrimPrefix(util.GetStringFromPointer(name), l.prefix)
  883. isDir := strings.HasSuffix(result, "/")
  884. if isDir {
  885. result = strings.TrimSuffix(result, "/")
  886. }
  887. return result, isDir
  888. }
  889. func (l *s3DirLister) Next(limit int) ([]os.FileInfo, error) {
  890. if limit <= 0 {
  891. return nil, errInvalidDirListerLimit
  892. }
  893. if len(l.cache) >= limit {
  894. return l.returnFromCache(limit), nil
  895. }
  896. if !l.paginator.HasMorePages() {
  897. if !l.metricUpdated {
  898. l.metricUpdated = true
  899. metric.S3ListObjectsCompleted(nil)
  900. }
  901. return l.returnFromCache(limit), io.EOF
  902. }
  903. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(l.timeout))
  904. defer cancelFn()
  905. page, err := l.paginator.NextPage(ctx)
  906. if err != nil {
  907. metric.S3ListObjectsCompleted(err)
  908. return l.cache, err
  909. }
  910. for _, p := range page.CommonPrefixes {
  911. // prefixes have a trailing slash
  912. name, _ := l.resolve(p.Prefix)
  913. if name == "" {
  914. continue
  915. }
  916. if _, ok := l.prefixes[name]; ok {
  917. continue
  918. }
  919. l.cache = append(l.cache, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  920. l.prefixes[name] = true
  921. }
  922. for _, fileObject := range page.Contents {
  923. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  924. objectSize := util.GetIntFromPointer(fileObject.Size)
  925. name, isDir := l.resolve(fileObject.Key)
  926. if name == "" || name == "/" {
  927. continue
  928. }
  929. if isDir {
  930. if _, ok := l.prefixes[name]; ok {
  931. continue
  932. }
  933. l.prefixes[name] = true
  934. }
  935. l.cache = append(l.cache, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  936. }
  937. return l.returnFromCache(limit), nil
  938. }
  939. func (l *s3DirLister) Close() error {
  940. return l.baseDirLister.Close()
  941. }
  942. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration, skipTLSVerify bool) *awshttp.BuildableClient {
  943. c := awshttp.NewBuildableClient().
  944. WithDialerOptions(func(d *net.Dialer) {
  945. d.Timeout = 8 * time.Second
  946. }).
  947. WithTransportOptions(func(tr *http.Transport) {
  948. tr.IdleConnTimeout = idleConnectionTimeout
  949. tr.WriteBufferSize = s3TransferBufferSize
  950. tr.ReadBufferSize = s3TransferBufferSize
  951. if skipTLSVerify {
  952. if tr.TLSClientConfig != nil {
  953. tr.TLSClientConfig.InsecureSkipVerify = skipTLSVerify
  954. } else {
  955. tr.TLSClientConfig = &tls.Config{
  956. MinVersion: awshttp.DefaultHTTPTransportTLSMinVersion,
  957. InsecureSkipVerify: skipTLSVerify,
  958. }
  959. }
  960. }
  961. })
  962. if timeout > 0 {
  963. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  964. }
  965. return c
  966. }
  967. // ideally we should simply use url.PathEscape:
  968. //
  969. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  970. //
  971. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  972. func pathEscape(in string) string {
  973. var u url.URL
  974. u.Path = in
  975. return strings.ReplaceAll(u.String(), "+", "%2B")
  976. }