s3fs.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "crypto/md5"
  20. "crypto/sha256"
  21. "crypto/tls"
  22. "encoding/base64"
  23. "errors"
  24. "fmt"
  25. "io"
  26. "mime"
  27. "net"
  28. "net/http"
  29. "net/url"
  30. "os"
  31. "path"
  32. "path/filepath"
  33. "slices"
  34. "sort"
  35. "strings"
  36. "sync"
  37. "sync/atomic"
  38. "time"
  39. "github.com/aws/aws-sdk-go-v2/aws"
  40. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  41. "github.com/aws/aws-sdk-go-v2/config"
  42. "github.com/aws/aws-sdk-go-v2/credentials"
  43. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  44. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  45. "github.com/aws/aws-sdk-go-v2/service/s3"
  46. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  47. "github.com/aws/aws-sdk-go-v2/service/sts"
  48. "github.com/pkg/sftp"
  49. "github.com/drakkan/sftpgo/v2/internal/logger"
  50. "github.com/drakkan/sftpgo/v2/internal/metric"
  51. "github.com/drakkan/sftpgo/v2/internal/util"
  52. "github.com/drakkan/sftpgo/v2/internal/version"
  53. )
  54. const (
  55. // using this mime type for directories improves compatibility with s3fs-fuse
  56. s3DirMimeType = "application/x-directory"
  57. s3TransferBufferSize = 256 * 1024
  58. s3CopyObjectThreshold = 500 * 1024 * 1024
  59. )
  60. var (
  61. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  62. s3DefaultPageSize = int32(5000)
  63. )
  64. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  65. type S3Fs struct {
  66. connectionID string
  67. localTempDir string
  68. // if not empty this fs is mouted as virtual folder in the specified path
  69. mountPath string
  70. config *S3FsConfig
  71. svc *s3.Client
  72. ctxTimeout time.Duration
  73. sseCustomerKey string
  74. sseCustomerKeyMD5 string
  75. sseCustomerAlgo string
  76. }
  77. func init() {
  78. version.AddFeature("+s3")
  79. }
  80. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  81. // object storage
  82. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  83. if localTempDir == "" {
  84. localTempDir = getLocalTempDir()
  85. }
  86. fs := &S3Fs{
  87. connectionID: connectionID,
  88. localTempDir: localTempDir,
  89. mountPath: getMountPath(mountPath),
  90. config: &s3Config,
  91. ctxTimeout: 30 * time.Second,
  92. }
  93. if err := fs.config.validate(); err != nil {
  94. return fs, err
  95. }
  96. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  97. defer cancel()
  98. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(
  99. getAWSHTTPClient(0, 30*time.Second, fs.config.SkipTLSVerify)),
  100. )
  101. if err != nil {
  102. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  103. }
  104. if fs.config.Region != "" {
  105. awsConfig.Region = fs.config.Region
  106. }
  107. if !fs.config.AccessSecret.IsEmpty() {
  108. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  109. return fs, err
  110. }
  111. awsConfig.Credentials = aws.NewCredentialsCache(
  112. credentials.NewStaticCredentialsProvider(
  113. fs.config.AccessKey,
  114. fs.config.AccessSecret.GetPayload(),
  115. fs.config.SessionToken),
  116. )
  117. }
  118. if !fs.config.SSECustomerKey.IsEmpty() {
  119. if err := fs.config.SSECustomerKey.TryDecrypt(); err != nil {
  120. return fs, err
  121. }
  122. key := fs.config.SSECustomerKey.GetPayload()
  123. if len(key) == 32 {
  124. md5sumBinary := md5.Sum([]byte(key))
  125. fs.sseCustomerKey = base64.StdEncoding.EncodeToString([]byte(key))
  126. fs.sseCustomerKeyMD5 = base64.StdEncoding.EncodeToString(md5sumBinary[:])
  127. } else {
  128. keyHash := sha256.Sum256([]byte(key))
  129. md5sumBinary := md5.Sum(keyHash[:])
  130. fs.sseCustomerKey = base64.StdEncoding.EncodeToString(keyHash[:])
  131. fs.sseCustomerKeyMD5 = base64.StdEncoding.EncodeToString(md5sumBinary[:])
  132. }
  133. fs.sseCustomerAlgo = "AES256"
  134. }
  135. fs.setConfigDefaults()
  136. if fs.config.RoleARN != "" {
  137. client := sts.NewFromConfig(awsConfig)
  138. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  139. awsConfig.Credentials = creds
  140. }
  141. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  142. o.AppID = version.GetVersionHash()
  143. o.UsePathStyle = fs.config.ForcePathStyle
  144. o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired
  145. o.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired
  146. if fs.config.Endpoint != "" {
  147. o.BaseEndpoint = aws.String(fs.config.Endpoint)
  148. }
  149. })
  150. return fs, nil
  151. }
  152. // Name returns the name for the Fs implementation
  153. func (fs *S3Fs) Name() string {
  154. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  155. }
  156. // ConnectionID returns the connection ID associated to this Fs implementation
  157. func (fs *S3Fs) ConnectionID() string {
  158. return fs.connectionID
  159. }
  160. // Stat returns a FileInfo describing the named file
  161. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  162. var result *FileInfo
  163. if name == "" || name == "/" || name == "." {
  164. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  165. }
  166. if fs.config.KeyPrefix == name+"/" {
  167. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  168. }
  169. obj, err := fs.headObject(name)
  170. if err == nil {
  171. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  172. // So we check some common content types to detect if this is a "directory".
  173. isDir := slices.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  174. if util.GetIntFromPointer(obj.ContentLength) == 0 && !isDir {
  175. _, err = fs.headObject(name + "/")
  176. isDir = err == nil
  177. }
  178. info := NewFileInfo(name, isDir, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false)
  179. return info, nil
  180. }
  181. if !fs.IsNotExist(err) {
  182. return result, err
  183. }
  184. // now check if this is a prefix (virtual directory)
  185. hasContents, err := fs.hasContents(name)
  186. if err == nil && hasContents {
  187. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  188. } else if err != nil {
  189. return nil, err
  190. }
  191. // the requested file may still be a directory as a zero bytes key
  192. // with a trailing forward slash (created using mkdir).
  193. // S3 doesn't return content type when listing objects, so we have
  194. // create "dirs" adding a trailing "/" to the key
  195. return fs.getStatForDir(name)
  196. }
  197. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  198. var result *FileInfo
  199. obj, err := fs.headObject(name + "/")
  200. if err != nil {
  201. return result, err
  202. }
  203. return NewFileInfo(name, true, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false), nil
  204. }
  205. // Lstat returns a FileInfo describing the named file
  206. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  207. return fs.Stat(name)
  208. }
  209. // Open opens the named file for reading
  210. func (fs *S3Fs) Open(name string, offset int64) (File, PipeReader, func(), error) {
  211. r, w, err := createPipeFn(fs.localTempDir, fs.config.DownloadPartSize*int64(fs.config.DownloadConcurrency)+1)
  212. if err != nil {
  213. return nil, nil, nil, err
  214. }
  215. p := NewPipeReader(r)
  216. if readMetadata > 0 {
  217. attrs, err := fs.headObject(name)
  218. if err != nil {
  219. r.Close()
  220. w.Close()
  221. return nil, nil, nil, err
  222. }
  223. p.setMetadata(attrs.Metadata)
  224. }
  225. ctx, cancelFn := context.WithCancel(context.Background())
  226. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  227. d.Concurrency = fs.config.DownloadConcurrency
  228. d.PartSize = fs.config.DownloadPartSize
  229. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  230. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  231. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  232. fs.config.SkipTLSVerify)
  233. })
  234. }
  235. })
  236. var streamRange *string
  237. if offset > 0 {
  238. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  239. }
  240. go func() {
  241. defer cancelFn()
  242. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  243. Bucket: aws.String(fs.config.Bucket),
  244. Key: aws.String(name),
  245. Range: streamRange,
  246. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  247. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  248. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  249. })
  250. w.CloseWithError(err) //nolint:errcheck
  251. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  252. metric.S3TransferCompleted(n, 1, err)
  253. }()
  254. return nil, p, cancelFn, nil
  255. }
  256. // Create creates or opens the named file for writing
  257. func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
  258. if checks&CheckParentDir != 0 {
  259. _, err := fs.Stat(path.Dir(name))
  260. if err != nil {
  261. return nil, nil, nil, err
  262. }
  263. }
  264. r, w, err := createPipeFn(fs.localTempDir, fs.config.UploadPartSize+1024*1024)
  265. if err != nil {
  266. return nil, nil, nil, err
  267. }
  268. var p PipeWriter
  269. if checks&CheckResume != 0 {
  270. p = newPipeWriterAtOffset(w, 0)
  271. } else {
  272. p = NewPipeWriter(w)
  273. }
  274. ctx, cancelFn := context.WithCancel(context.Background())
  275. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  276. u.Concurrency = fs.config.UploadConcurrency
  277. u.PartSize = fs.config.UploadPartSize
  278. if fs.config.UploadPartMaxTime > 0 {
  279. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  280. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond,
  281. fs.config.SkipTLSVerify)
  282. })
  283. }
  284. })
  285. go func() {
  286. defer cancelFn()
  287. var contentType string
  288. if flag == -1 {
  289. contentType = s3DirMimeType
  290. } else {
  291. contentType = mime.TypeByExtension(path.Ext(name))
  292. }
  293. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  294. Bucket: aws.String(fs.config.Bucket),
  295. Key: aws.String(name),
  296. Body: r,
  297. ACL: types.ObjectCannedACL(fs.config.ACL),
  298. StorageClass: types.StorageClass(fs.config.StorageClass),
  299. ContentType: util.NilIfEmpty(contentType),
  300. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  301. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  302. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  303. })
  304. r.CloseWithError(err) //nolint:errcheck
  305. p.Done(err)
  306. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v",
  307. name, fs.config.ACL, r.GetReadedBytes(), err)
  308. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  309. }()
  310. if checks&CheckResume != 0 {
  311. readCh := make(chan error, 1)
  312. go func() {
  313. n, err := fs.downloadToWriter(name, p)
  314. pw := p.(*pipeWriterAtOffset)
  315. pw.offset = 0
  316. pw.writeOffset = n
  317. readCh <- err
  318. }()
  319. err = <-readCh
  320. if err != nil {
  321. cancelFn()
  322. p.Close()
  323. fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
  324. return nil, nil, nil, err
  325. }
  326. }
  327. if uploadMode&4 != 0 {
  328. return nil, p, nil, nil
  329. }
  330. return nil, p, cancelFn, nil
  331. }
  332. // Rename renames (moves) source to target.
  333. func (fs *S3Fs) Rename(source, target string, checks int) (int, int64, error) {
  334. if source == target {
  335. return -1, -1, nil
  336. }
  337. if checks&CheckParentDir != 0 {
  338. _, err := fs.Stat(path.Dir(target))
  339. if err != nil {
  340. return -1, -1, err
  341. }
  342. }
  343. fi, err := fs.Stat(source)
  344. if err != nil {
  345. return -1, -1, err
  346. }
  347. return fs.renameInternal(source, target, fi, 0, checks&CheckUpdateModTime != 0)
  348. }
  349. // Remove removes the named file or (empty) directory.
  350. func (fs *S3Fs) Remove(name string, isDir bool) error {
  351. if isDir {
  352. hasContents, err := fs.hasContents(name)
  353. if err != nil {
  354. return err
  355. }
  356. if hasContents {
  357. return fmt.Errorf("cannot remove non empty directory: %q", name)
  358. }
  359. if !strings.HasSuffix(name, "/") {
  360. name += "/"
  361. }
  362. }
  363. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  364. defer cancelFn()
  365. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  366. Bucket: aws.String(fs.config.Bucket),
  367. Key: aws.String(name),
  368. })
  369. metric.S3DeleteObjectCompleted(err)
  370. return err
  371. }
  372. // Mkdir creates a new directory with the specified name and default permissions
  373. func (fs *S3Fs) Mkdir(name string) error {
  374. _, err := fs.Stat(name)
  375. if !fs.IsNotExist(err) {
  376. return err
  377. }
  378. return fs.mkdirInternal(name)
  379. }
  380. // Symlink creates source as a symbolic link to target.
  381. func (*S3Fs) Symlink(_, _ string) error {
  382. return ErrVfsUnsupported
  383. }
  384. // Readlink returns the destination of the named symbolic link
  385. func (*S3Fs) Readlink(_ string) (string, error) {
  386. return "", ErrVfsUnsupported
  387. }
  388. // Chown changes the numeric uid and gid of the named file.
  389. func (*S3Fs) Chown(_ string, _ int, _ int) error {
  390. return ErrVfsUnsupported
  391. }
  392. // Chmod changes the mode of the named file to mode.
  393. func (*S3Fs) Chmod(_ string, _ os.FileMode) error {
  394. return ErrVfsUnsupported
  395. }
  396. // Chtimes changes the access and modification times of the named file.
  397. func (fs *S3Fs) Chtimes(_ string, _, _ time.Time, _ bool) error {
  398. return ErrVfsUnsupported
  399. }
  400. // Truncate changes the size of the named file.
  401. // Truncate by path is not supported, while truncating an opened
  402. // file is handled inside base transfer
  403. func (*S3Fs) Truncate(_ string, _ int64) error {
  404. return ErrVfsUnsupported
  405. }
  406. // ReadDir reads the directory named by dirname and returns
  407. // a list of directory entries.
  408. func (fs *S3Fs) ReadDir(dirname string) (DirLister, error) {
  409. // dirname must be already cleaned
  410. prefix := fs.getPrefix(dirname)
  411. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  412. Bucket: aws.String(fs.config.Bucket),
  413. Prefix: aws.String(prefix),
  414. Delimiter: aws.String("/"),
  415. MaxKeys: &s3DefaultPageSize,
  416. })
  417. return &s3DirLister{
  418. paginator: paginator,
  419. timeout: fs.ctxTimeout,
  420. prefix: prefix,
  421. prefixes: make(map[string]bool),
  422. }, nil
  423. }
  424. // IsUploadResumeSupported returns true if resuming uploads is supported.
  425. // Resuming uploads is not supported on S3
  426. func (*S3Fs) IsUploadResumeSupported() bool {
  427. return false
  428. }
  429. // IsConditionalUploadResumeSupported returns if resuming uploads is supported
  430. // for the specified size
  431. func (*S3Fs) IsConditionalUploadResumeSupported(size int64) bool {
  432. return size <= resumeMaxSize
  433. }
  434. // IsAtomicUploadSupported returns true if atomic upload is supported.
  435. // S3 uploads are already atomic, we don't need to upload to a temporary
  436. // file
  437. func (*S3Fs) IsAtomicUploadSupported() bool {
  438. return false
  439. }
  440. // IsNotExist returns a boolean indicating whether the error is known to
  441. // report that a file or directory does not exist
  442. func (*S3Fs) IsNotExist(err error) bool {
  443. if err == nil {
  444. return false
  445. }
  446. var re *awshttp.ResponseError
  447. if errors.As(err, &re) {
  448. if re.Response != nil {
  449. return re.Response.StatusCode == http.StatusNotFound
  450. }
  451. }
  452. return false
  453. }
  454. // IsPermission returns a boolean indicating whether the error is known to
  455. // report that permission is denied.
  456. func (*S3Fs) IsPermission(err error) bool {
  457. if err == nil {
  458. return false
  459. }
  460. var re *awshttp.ResponseError
  461. if errors.As(err, &re) {
  462. if re.Response != nil {
  463. return re.Response.StatusCode == http.StatusForbidden ||
  464. re.Response.StatusCode == http.StatusUnauthorized
  465. }
  466. }
  467. return false
  468. }
  469. // IsNotSupported returns true if the error indicate an unsupported operation
  470. func (*S3Fs) IsNotSupported(err error) bool {
  471. if err == nil {
  472. return false
  473. }
  474. return errors.Is(err, ErrVfsUnsupported)
  475. }
  476. // CheckRootPath creates the specified local root directory if it does not exists
  477. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  478. // we need a local directory for temporary files
  479. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  480. return osFs.CheckRootPath(username, uid, gid)
  481. }
  482. // ScanRootDirContents returns the number of files contained in the bucket,
  483. // and their size
  484. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  485. return fs.GetDirSize(fs.config.KeyPrefix)
  486. }
  487. // GetDirSize returns the number of files and the size for a folder
  488. // including any subfolders
  489. func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
  490. prefix := fs.getPrefix(dirname)
  491. numFiles := 0
  492. size := int64(0)
  493. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  494. Bucket: aws.String(fs.config.Bucket),
  495. Prefix: aws.String(prefix),
  496. MaxKeys: &s3DefaultPageSize,
  497. })
  498. for paginator.HasMorePages() {
  499. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  500. defer cancelFn()
  501. page, err := paginator.NextPage(ctx)
  502. if err != nil {
  503. metric.S3ListObjectsCompleted(err)
  504. return numFiles, size, err
  505. }
  506. for _, fileObject := range page.Contents {
  507. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  508. objectSize := util.GetIntFromPointer(fileObject.Size)
  509. if isDir && objectSize == 0 {
  510. continue
  511. }
  512. numFiles++
  513. size += objectSize
  514. }
  515. fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size)
  516. }
  517. metric.S3ListObjectsCompleted(nil)
  518. return numFiles, size, nil
  519. }
  520. // GetAtomicUploadPath returns the path to use for an atomic upload.
  521. // S3 uploads are already atomic, we never call this method for S3
  522. func (*S3Fs) GetAtomicUploadPath(_ string) string {
  523. return ""
  524. }
  525. // GetRelativePath returns the path for a file relative to the user's home dir.
  526. // This is the path as seen by SFTPGo users
  527. func (fs *S3Fs) GetRelativePath(name string) string {
  528. rel := path.Clean(name)
  529. if rel == "." {
  530. rel = ""
  531. }
  532. if !path.IsAbs(rel) {
  533. rel = "/" + rel
  534. }
  535. if fs.config.KeyPrefix != "" {
  536. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  537. rel = "/"
  538. }
  539. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  540. }
  541. if fs.mountPath != "" {
  542. rel = path.Join(fs.mountPath, rel)
  543. }
  544. return rel
  545. }
  546. // Walk walks the file tree rooted at root, calling walkFn for each file or
  547. // directory in the tree, including root. The result are unordered
  548. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  549. prefix := fs.getPrefix(root)
  550. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  551. Bucket: aws.String(fs.config.Bucket),
  552. Prefix: aws.String(prefix),
  553. MaxKeys: &s3DefaultPageSize,
  554. })
  555. for paginator.HasMorePages() {
  556. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  557. defer cancelFn()
  558. page, err := paginator.NextPage(ctx)
  559. if err != nil {
  560. metric.S3ListObjectsCompleted(err)
  561. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  562. return err
  563. }
  564. for _, fileObject := range page.Contents {
  565. name, isDir := fs.resolve(fileObject.Key, prefix)
  566. if name == "" {
  567. continue
  568. }
  569. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  570. NewFileInfo(name, isDir, util.GetIntFromPointer(fileObject.Size),
  571. util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  572. if err != nil {
  573. return err
  574. }
  575. }
  576. }
  577. metric.S3ListObjectsCompleted(nil)
  578. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  579. return nil
  580. }
  581. // Join joins any number of path elements into a single path
  582. func (*S3Fs) Join(elem ...string) string {
  583. return strings.TrimPrefix(path.Join(elem...), "/")
  584. }
  585. // HasVirtualFolders returns true if folders are emulated
  586. func (*S3Fs) HasVirtualFolders() bool {
  587. return true
  588. }
  589. // ResolvePath returns the matching filesystem path for the specified virtual path
  590. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  591. if fs.mountPath != "" {
  592. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  593. }
  594. if !path.IsAbs(virtualPath) {
  595. virtualPath = path.Clean("/" + virtualPath)
  596. }
  597. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  598. }
  599. // CopyFile implements the FsFileCopier interface
  600. func (fs *S3Fs) CopyFile(source, target string, srcInfo os.FileInfo) (int, int64, error) {
  601. numFiles := 1
  602. sizeDiff := srcInfo.Size()
  603. attrs, err := fs.headObject(target)
  604. if err == nil {
  605. sizeDiff -= util.GetIntFromPointer(attrs.ContentLength)
  606. numFiles = 0
  607. } else {
  608. if !fs.IsNotExist(err) {
  609. return 0, 0, err
  610. }
  611. }
  612. if err := fs.copyFileInternal(source, target, srcInfo); err != nil {
  613. return 0, 0, err
  614. }
  615. return numFiles, sizeDiff, nil
  616. }
  617. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  618. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  619. isDir := strings.HasSuffix(result, "/")
  620. if isDir {
  621. result = strings.TrimSuffix(result, "/")
  622. }
  623. return result, isDir
  624. }
  625. func (fs *S3Fs) setConfigDefaults() {
  626. if fs.config.UploadPartSize == 0 {
  627. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  628. } else {
  629. if fs.config.UploadPartSize < 1024*1024 {
  630. fs.config.UploadPartSize *= 1024 * 1024
  631. }
  632. }
  633. if fs.config.UploadConcurrency == 0 {
  634. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  635. }
  636. if fs.config.DownloadPartSize == 0 {
  637. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  638. } else {
  639. if fs.config.DownloadPartSize < 1024*1024 {
  640. fs.config.DownloadPartSize *= 1024 * 1024
  641. }
  642. }
  643. if fs.config.DownloadConcurrency == 0 {
  644. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  645. }
  646. }
  647. func (fs *S3Fs) copyFileInternal(source, target string, srcInfo os.FileInfo) error {
  648. contentType := mime.TypeByExtension(path.Ext(source))
  649. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  650. if srcInfo.Size() > s3CopyObjectThreshold {
  651. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  652. source, srcInfo.Size())
  653. err := fs.doMultipartCopy(copySource, target, contentType, srcInfo.Size())
  654. metric.S3CopyObjectCompleted(err)
  655. return err
  656. }
  657. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  658. defer cancelFn()
  659. copyObject := &s3.CopyObjectInput{
  660. Bucket: aws.String(fs.config.Bucket),
  661. CopySource: aws.String(copySource),
  662. Key: aws.String(target),
  663. StorageClass: types.StorageClass(fs.config.StorageClass),
  664. ACL: types.ObjectCannedACL(fs.config.ACL),
  665. ContentType: util.NilIfEmpty(contentType),
  666. CopySourceSSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  667. CopySourceSSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  668. CopySourceSSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  669. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  670. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  671. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  672. }
  673. _, err := fs.svc.CopyObject(ctx, copyObject)
  674. metric.S3CopyObjectCompleted(err)
  675. return err
  676. }
  677. func (fs *S3Fs) renameInternal(source, target string, srcInfo os.FileInfo, recursion int,
  678. updateModTime bool,
  679. ) (int, int64, error) {
  680. var numFiles int
  681. var filesSize int64
  682. if srcInfo.IsDir() {
  683. if renameMode == 0 {
  684. hasContents, err := fs.hasContents(source)
  685. if err != nil {
  686. return numFiles, filesSize, err
  687. }
  688. if hasContents {
  689. return numFiles, filesSize, fmt.Errorf("%w: cannot rename non empty directory: %q", ErrVfsUnsupported, source)
  690. }
  691. }
  692. if err := fs.mkdirInternal(target); err != nil {
  693. return numFiles, filesSize, err
  694. }
  695. if renameMode == 1 {
  696. files, size, err := doRecursiveRename(fs, source, target, fs.renameInternal, recursion, updateModTime)
  697. numFiles += files
  698. filesSize += size
  699. if err != nil {
  700. return numFiles, filesSize, err
  701. }
  702. }
  703. } else {
  704. if err := fs.copyFileInternal(source, target, srcInfo); err != nil {
  705. return numFiles, filesSize, err
  706. }
  707. numFiles++
  708. filesSize += srcInfo.Size()
  709. }
  710. err := fs.Remove(source, srcInfo.IsDir())
  711. if fs.IsNotExist(err) {
  712. err = nil
  713. }
  714. return numFiles, filesSize, err
  715. }
  716. func (fs *S3Fs) mkdirInternal(name string) error {
  717. if !strings.HasSuffix(name, "/") {
  718. name += "/"
  719. }
  720. _, w, _, err := fs.Create(name, -1, 0)
  721. if err != nil {
  722. return err
  723. }
  724. return w.Close()
  725. }
  726. func (fs *S3Fs) hasContents(name string) (bool, error) {
  727. prefix := fs.getPrefix(name)
  728. maxKeys := int32(2)
  729. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  730. Bucket: aws.String(fs.config.Bucket),
  731. Prefix: aws.String(prefix),
  732. MaxKeys: &maxKeys,
  733. })
  734. if paginator.HasMorePages() {
  735. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  736. defer cancelFn()
  737. page, err := paginator.NextPage(ctx)
  738. metric.S3ListObjectsCompleted(err)
  739. if err != nil {
  740. return false, err
  741. }
  742. for _, obj := range page.Contents {
  743. name, _ := fs.resolve(obj.Key, prefix)
  744. if name == "" || name == "/" {
  745. continue
  746. }
  747. return true, nil
  748. }
  749. return false, nil
  750. }
  751. metric.S3ListObjectsCompleted(nil)
  752. return false, nil
  753. }
  754. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  755. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  756. defer cancelFn()
  757. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  758. Bucket: aws.String(fs.config.Bucket),
  759. Key: aws.String(target),
  760. StorageClass: types.StorageClass(fs.config.StorageClass),
  761. ACL: types.ObjectCannedACL(fs.config.ACL),
  762. ContentType: util.NilIfEmpty(contentType),
  763. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  764. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  765. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  766. })
  767. if err != nil {
  768. return fmt.Errorf("unable to create multipart copy request: %w", err)
  769. }
  770. uploadID := util.GetStringFromPointer(res.UploadId)
  771. if uploadID == "" {
  772. return errors.New("unable to get multipart copy upload ID")
  773. }
  774. // We use 32 MB part size and copy 10 parts in parallel.
  775. // These values are arbitrary. We don't want to start too many goroutines
  776. maxPartSize := int64(32 * 1024 * 1024)
  777. if fileSize > int64(100*1024*1024*1024) {
  778. maxPartSize = int64(500 * 1024 * 1024)
  779. }
  780. guard := make(chan struct{}, 10)
  781. finished := false
  782. var completedParts []types.CompletedPart
  783. var partMutex sync.Mutex
  784. var wg sync.WaitGroup
  785. var hasError atomic.Bool
  786. var errOnce sync.Once
  787. var copyError error
  788. var partNumber int32
  789. var offset int64
  790. opCtx, opCancel := context.WithCancel(context.Background())
  791. defer opCancel()
  792. for partNumber = 1; !finished; partNumber++ {
  793. start := offset
  794. end := offset + maxPartSize
  795. if end >= fileSize {
  796. end = fileSize
  797. finished = true
  798. }
  799. offset = end
  800. guard <- struct{}{}
  801. if hasError.Load() {
  802. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  803. break
  804. }
  805. wg.Add(1)
  806. go func(partNum int32, partStart, partEnd int64) {
  807. defer func() {
  808. <-guard
  809. wg.Done()
  810. }()
  811. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  812. defer innerCancelFn()
  813. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  814. Bucket: aws.String(fs.config.Bucket),
  815. CopySource: aws.String(source),
  816. Key: aws.String(target),
  817. PartNumber: &partNum,
  818. UploadId: aws.String(uploadID),
  819. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  820. CopySourceSSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  821. CopySourceSSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  822. CopySourceSSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  823. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  824. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  825. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  826. })
  827. if err != nil {
  828. errOnce.Do(func() {
  829. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  830. hasError.Store(true)
  831. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  832. opCancel()
  833. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  834. defer abortCancelFn()
  835. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  836. Bucket: aws.String(fs.config.Bucket),
  837. Key: aws.String(target),
  838. UploadId: aws.String(uploadID),
  839. })
  840. if errAbort != nil {
  841. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  842. }
  843. })
  844. return
  845. }
  846. partMutex.Lock()
  847. completedParts = append(completedParts, types.CompletedPart{
  848. ETag: partResp.CopyPartResult.ETag,
  849. PartNumber: &partNum,
  850. })
  851. partMutex.Unlock()
  852. }(partNumber, start, end)
  853. }
  854. wg.Wait()
  855. close(guard)
  856. if copyError != nil {
  857. return copyError
  858. }
  859. sort.Slice(completedParts, func(i, j int) bool {
  860. getPartNumber := func(number *int32) int32 {
  861. if number == nil {
  862. return 0
  863. }
  864. return *number
  865. }
  866. return getPartNumber(completedParts[i].PartNumber) < getPartNumber(completedParts[j].PartNumber)
  867. })
  868. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  869. defer completeCancelFn()
  870. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  871. Bucket: aws.String(fs.config.Bucket),
  872. Key: aws.String(target),
  873. UploadId: aws.String(uploadID),
  874. MultipartUpload: &types.CompletedMultipartUpload{
  875. Parts: completedParts,
  876. },
  877. })
  878. if err != nil {
  879. return fmt.Errorf("unable to complete multipart upload: %w", err)
  880. }
  881. return nil
  882. }
  883. func (fs *S3Fs) getPrefix(name string) string {
  884. prefix := ""
  885. if name != "" && name != "." && name != "/" {
  886. prefix = strings.TrimPrefix(name, "/")
  887. if !strings.HasSuffix(prefix, "/") {
  888. prefix += "/"
  889. }
  890. }
  891. return prefix
  892. }
  893. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  894. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  895. defer cancelFn()
  896. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  897. Bucket: aws.String(fs.config.Bucket),
  898. Key: aws.String(name),
  899. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  900. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  901. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  902. })
  903. metric.S3HeadObjectCompleted(err)
  904. return obj, err
  905. }
  906. // GetMimeType returns the content type
  907. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  908. obj, err := fs.headObject(name)
  909. if err != nil {
  910. return "", err
  911. }
  912. return util.GetStringFromPointer(obj.ContentType), nil
  913. }
  914. // Close closes the fs
  915. func (*S3Fs) Close() error {
  916. return nil
  917. }
  918. // GetAvailableDiskSize returns the available size for the specified path
  919. func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  920. return nil, ErrStorageSizeUnavailable
  921. }
  922. func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) (int64, error) {
  923. fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
  924. ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
  925. defer cancelFn()
  926. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  927. d.Concurrency = fs.config.DownloadConcurrency
  928. d.PartSize = fs.config.DownloadPartSize
  929. if fs.config.DownloadPartMaxTime > 0 {
  930. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  931. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  932. fs.config.SkipTLSVerify)
  933. })
  934. }
  935. })
  936. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  937. Bucket: aws.String(fs.config.Bucket),
  938. Key: aws.String(name),
  939. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  940. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  941. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  942. })
  943. fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
  944. name, n, err)
  945. metric.S3TransferCompleted(n, 1, err)
  946. return n, err
  947. }
  948. type s3DirLister struct {
  949. baseDirLister
  950. paginator *s3.ListObjectsV2Paginator
  951. timeout time.Duration
  952. prefix string
  953. prefixes map[string]bool
  954. metricUpdated bool
  955. }
  956. func (l *s3DirLister) resolve(name *string) (string, bool) {
  957. result := strings.TrimPrefix(util.GetStringFromPointer(name), l.prefix)
  958. isDir := strings.HasSuffix(result, "/")
  959. if isDir {
  960. result = strings.TrimSuffix(result, "/")
  961. }
  962. return result, isDir
  963. }
  964. func (l *s3DirLister) Next(limit int) ([]os.FileInfo, error) {
  965. if limit <= 0 {
  966. return nil, errInvalidDirListerLimit
  967. }
  968. if len(l.cache) >= limit {
  969. return l.returnFromCache(limit), nil
  970. }
  971. if !l.paginator.HasMorePages() {
  972. if !l.metricUpdated {
  973. l.metricUpdated = true
  974. metric.S3ListObjectsCompleted(nil)
  975. }
  976. return l.returnFromCache(limit), io.EOF
  977. }
  978. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(l.timeout))
  979. defer cancelFn()
  980. page, err := l.paginator.NextPage(ctx)
  981. if err != nil {
  982. metric.S3ListObjectsCompleted(err)
  983. return l.cache, err
  984. }
  985. for _, p := range page.CommonPrefixes {
  986. // prefixes have a trailing slash
  987. name, _ := l.resolve(p.Prefix)
  988. if name == "" {
  989. continue
  990. }
  991. if _, ok := l.prefixes[name]; ok {
  992. continue
  993. }
  994. l.cache = append(l.cache, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  995. l.prefixes[name] = true
  996. }
  997. for _, fileObject := range page.Contents {
  998. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  999. objectSize := util.GetIntFromPointer(fileObject.Size)
  1000. name, isDir := l.resolve(fileObject.Key)
  1001. if name == "" || name == "/" {
  1002. continue
  1003. }
  1004. if isDir {
  1005. if _, ok := l.prefixes[name]; ok {
  1006. continue
  1007. }
  1008. l.prefixes[name] = true
  1009. }
  1010. l.cache = append(l.cache, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  1011. }
  1012. return l.returnFromCache(limit), nil
  1013. }
  1014. func (l *s3DirLister) Close() error {
  1015. return l.baseDirLister.Close()
  1016. }
  1017. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration, skipTLSVerify bool) *awshttp.BuildableClient {
  1018. c := awshttp.NewBuildableClient().
  1019. WithDialerOptions(func(d *net.Dialer) {
  1020. d.Timeout = 8 * time.Second
  1021. }).
  1022. WithTransportOptions(func(tr *http.Transport) {
  1023. tr.IdleConnTimeout = idleConnectionTimeout
  1024. tr.WriteBufferSize = s3TransferBufferSize
  1025. tr.ReadBufferSize = s3TransferBufferSize
  1026. if skipTLSVerify {
  1027. if tr.TLSClientConfig != nil {
  1028. tr.TLSClientConfig.InsecureSkipVerify = skipTLSVerify
  1029. } else {
  1030. tr.TLSClientConfig = &tls.Config{
  1031. MinVersion: awshttp.DefaultHTTPTransportTLSMinVersion,
  1032. InsecureSkipVerify: skipTLSVerify,
  1033. }
  1034. }
  1035. }
  1036. })
  1037. if timeout > 0 {
  1038. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  1039. }
  1040. return c
  1041. }
  1042. // ideally we should simply use url.PathEscape:
  1043. //
  1044. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  1045. //
  1046. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  1047. func pathEscape(in string) string {
  1048. var u url.URL
  1049. u.Path = in
  1050. return strings.ReplaceAll(u.String(), "+", "%2B")
  1051. }