s3fs.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. package vfs
  16. import (
  17. "context"
  18. "crypto/md5"
  19. "crypto/sha256"
  20. "crypto/tls"
  21. "encoding/base64"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "mime"
  26. "net"
  27. "net/http"
  28. "net/url"
  29. "os"
  30. "path"
  31. "path/filepath"
  32. "slices"
  33. "sort"
  34. "strings"
  35. "sync"
  36. "sync/atomic"
  37. "time"
  38. "github.com/aws/aws-sdk-go-v2/aws"
  39. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  40. "github.com/aws/aws-sdk-go-v2/config"
  41. "github.com/aws/aws-sdk-go-v2/credentials"
  42. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  43. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  44. "github.com/aws/aws-sdk-go-v2/service/s3"
  45. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  46. "github.com/aws/aws-sdk-go-v2/service/sts"
  47. "github.com/pkg/sftp"
  48. "github.com/drakkan/sftpgo/v2/internal/logger"
  49. "github.com/drakkan/sftpgo/v2/internal/metric"
  50. "github.com/drakkan/sftpgo/v2/internal/util"
  51. "github.com/drakkan/sftpgo/v2/internal/version"
  52. )
  53. const (
  54. // using this mime type for directories improves compatibility with s3fs-fuse
  55. s3DirMimeType = "application/x-directory"
  56. s3TransferBufferSize = 256 * 1024
  57. s3CopyObjectThreshold = 500 * 1024 * 1024
  58. )
  59. var (
  60. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  61. s3DefaultPageSize = int32(5000)
  62. )
  63. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  64. type S3Fs struct {
  65. connectionID string
  66. localTempDir string
  67. // if not empty this fs is mouted as virtual folder in the specified path
  68. mountPath string
  69. config *S3FsConfig
  70. svc *s3.Client
  71. ctxTimeout time.Duration
  72. sseCustomerKey string
  73. sseCustomerKeyMD5 string
  74. sseCustomerAlgo string
  75. }
  76. func init() {
  77. version.AddFeature("+s3")
  78. }
  79. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  80. // object storage
  81. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  82. if localTempDir == "" {
  83. localTempDir = getLocalTempDir()
  84. }
  85. fs := &S3Fs{
  86. connectionID: connectionID,
  87. localTempDir: localTempDir,
  88. mountPath: getMountPath(mountPath),
  89. config: &s3Config,
  90. ctxTimeout: 30 * time.Second,
  91. }
  92. if err := fs.config.validate(); err != nil {
  93. return fs, err
  94. }
  95. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  96. defer cancel()
  97. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(
  98. getAWSHTTPClient(0, 30*time.Second, fs.config.SkipTLSVerify)),
  99. )
  100. if err != nil {
  101. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  102. }
  103. if fs.config.Region != "" {
  104. awsConfig.Region = fs.config.Region
  105. }
  106. if !fs.config.AccessSecret.IsEmpty() {
  107. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  108. return fs, err
  109. }
  110. awsConfig.Credentials = aws.NewCredentialsCache(
  111. credentials.NewStaticCredentialsProvider(
  112. fs.config.AccessKey,
  113. fs.config.AccessSecret.GetPayload(),
  114. fs.config.SessionToken),
  115. )
  116. }
  117. if !fs.config.SSECustomerKey.IsEmpty() {
  118. if err := fs.config.SSECustomerKey.TryDecrypt(); err != nil {
  119. return fs, err
  120. }
  121. key := fs.config.SSECustomerKey.GetPayload()
  122. if len(key) == 32 {
  123. md5sumBinary := md5.Sum([]byte(key))
  124. fs.sseCustomerKey = base64.StdEncoding.EncodeToString([]byte(key))
  125. fs.sseCustomerKeyMD5 = base64.StdEncoding.EncodeToString(md5sumBinary[:])
  126. } else {
  127. keyHash := sha256.Sum256([]byte(key))
  128. md5sumBinary := md5.Sum(keyHash[:])
  129. fs.sseCustomerKey = base64.StdEncoding.EncodeToString(keyHash[:])
  130. fs.sseCustomerKeyMD5 = base64.StdEncoding.EncodeToString(md5sumBinary[:])
  131. }
  132. fs.sseCustomerAlgo = "AES256"
  133. }
  134. fs.setConfigDefaults()
  135. if fs.config.RoleARN != "" {
  136. client := sts.NewFromConfig(awsConfig)
  137. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  138. awsConfig.Credentials = creds
  139. }
  140. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  141. o.AppID = version.GetVersionHash()
  142. o.UsePathStyle = fs.config.ForcePathStyle
  143. o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired
  144. o.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired
  145. if fs.config.Endpoint != "" {
  146. o.BaseEndpoint = aws.String(fs.config.Endpoint)
  147. }
  148. })
  149. return fs, nil
  150. }
  151. // Name returns the name for the Fs implementation
  152. func (fs *S3Fs) Name() string {
  153. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  154. }
  155. // ConnectionID returns the connection ID associated to this Fs implementation
  156. func (fs *S3Fs) ConnectionID() string {
  157. return fs.connectionID
  158. }
  159. // Stat returns a FileInfo describing the named file
  160. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  161. var result *FileInfo
  162. if name == "" || name == "/" || name == "." {
  163. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  164. }
  165. if fs.config.KeyPrefix == name+"/" {
  166. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  167. }
  168. obj, err := fs.headObject(name)
  169. if err == nil {
  170. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  171. // So we check some common content types to detect if this is a "directory".
  172. isDir := slices.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  173. if util.GetIntFromPointer(obj.ContentLength) == 0 && !isDir {
  174. _, err = fs.headObject(name + "/")
  175. isDir = err == nil
  176. }
  177. info := NewFileInfo(name, isDir, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false)
  178. return info, nil
  179. }
  180. if !fs.IsNotExist(err) {
  181. return result, err
  182. }
  183. // now check if this is a prefix (virtual directory)
  184. hasContents, err := fs.hasContents(name)
  185. if err == nil && hasContents {
  186. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  187. } else if err != nil {
  188. return nil, err
  189. }
  190. // the requested file may still be a directory as a zero bytes key
  191. // with a trailing forward slash (created using mkdir).
  192. // S3 doesn't return content type when listing objects, so we have
  193. // create "dirs" adding a trailing "/" to the key
  194. return fs.getStatForDir(name)
  195. }
  196. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  197. var result *FileInfo
  198. obj, err := fs.headObject(name + "/")
  199. if err != nil {
  200. return result, err
  201. }
  202. return NewFileInfo(name, true, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false), nil
  203. }
  204. // Lstat returns a FileInfo describing the named file
  205. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  206. return fs.Stat(name)
  207. }
  208. // Open opens the named file for reading
  209. func (fs *S3Fs) Open(name string, offset int64) (File, PipeReader, func(), error) {
  210. r, w, err := createPipeFn(fs.localTempDir, fs.config.DownloadPartSize*int64(fs.config.DownloadConcurrency)+1)
  211. if err != nil {
  212. return nil, nil, nil, err
  213. }
  214. p := NewPipeReader(r)
  215. if readMetadata > 0 {
  216. attrs, err := fs.headObject(name)
  217. if err != nil {
  218. r.Close()
  219. w.Close()
  220. return nil, nil, nil, err
  221. }
  222. p.setMetadata(attrs.Metadata)
  223. }
  224. ctx, cancelFn := context.WithCancel(context.Background())
  225. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  226. d.Concurrency = fs.config.DownloadConcurrency
  227. d.PartSize = fs.config.DownloadPartSize
  228. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  229. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  230. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  231. fs.config.SkipTLSVerify)
  232. })
  233. }
  234. })
  235. var streamRange *string
  236. if offset > 0 {
  237. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  238. }
  239. go func() {
  240. defer cancelFn()
  241. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  242. Bucket: aws.String(fs.config.Bucket),
  243. Key: aws.String(name),
  244. Range: streamRange,
  245. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  246. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  247. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  248. })
  249. w.CloseWithError(err) //nolint:errcheck
  250. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  251. metric.S3TransferCompleted(n, 1, err)
  252. }()
  253. return nil, p, cancelFn, nil
  254. }
  255. // Create creates or opens the named file for writing
  256. func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
  257. if checks&CheckParentDir != 0 {
  258. _, err := fs.Stat(path.Dir(name))
  259. if err != nil {
  260. return nil, nil, nil, err
  261. }
  262. }
  263. r, w, err := createPipeFn(fs.localTempDir, fs.config.UploadPartSize+1024*1024)
  264. if err != nil {
  265. return nil, nil, nil, err
  266. }
  267. var p PipeWriter
  268. if checks&CheckResume != 0 {
  269. p = newPipeWriterAtOffset(w, 0)
  270. } else {
  271. p = NewPipeWriter(w)
  272. }
  273. ctx, cancelFn := context.WithCancel(context.Background())
  274. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  275. u.Concurrency = fs.config.UploadConcurrency
  276. u.PartSize = fs.config.UploadPartSize
  277. if fs.config.UploadPartMaxTime > 0 {
  278. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  279. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond,
  280. fs.config.SkipTLSVerify)
  281. })
  282. }
  283. })
  284. go func() {
  285. defer cancelFn()
  286. var contentType string
  287. if flag == -1 {
  288. contentType = s3DirMimeType
  289. } else {
  290. contentType = mime.TypeByExtension(path.Ext(name))
  291. }
  292. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  293. Bucket: aws.String(fs.config.Bucket),
  294. Key: aws.String(name),
  295. Body: r,
  296. ACL: types.ObjectCannedACL(fs.config.ACL),
  297. StorageClass: types.StorageClass(fs.config.StorageClass),
  298. ContentType: util.NilIfEmpty(contentType),
  299. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  300. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  301. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  302. })
  303. r.CloseWithError(err) //nolint:errcheck
  304. p.Done(err)
  305. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v",
  306. name, fs.config.ACL, r.GetReadedBytes(), err)
  307. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  308. }()
  309. if checks&CheckResume != 0 {
  310. readCh := make(chan error, 1)
  311. go func() {
  312. n, err := fs.downloadToWriter(name, p)
  313. pw := p.(*pipeWriterAtOffset)
  314. pw.offset = 0
  315. pw.writeOffset = n
  316. readCh <- err
  317. }()
  318. err = <-readCh
  319. if err != nil {
  320. cancelFn()
  321. p.Close()
  322. fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
  323. return nil, nil, nil, err
  324. }
  325. }
  326. if uploadMode&4 != 0 {
  327. return nil, p, nil, nil
  328. }
  329. return nil, p, cancelFn, nil
  330. }
  331. // Rename renames (moves) source to target.
  332. func (fs *S3Fs) Rename(source, target string, checks int) (int, int64, error) {
  333. if source == target {
  334. return -1, -1, nil
  335. }
  336. if checks&CheckParentDir != 0 {
  337. _, err := fs.Stat(path.Dir(target))
  338. if err != nil {
  339. return -1, -1, err
  340. }
  341. }
  342. fi, err := fs.Stat(source)
  343. if err != nil {
  344. return -1, -1, err
  345. }
  346. return fs.renameInternal(source, target, fi, 0, checks&CheckUpdateModTime != 0)
  347. }
  348. // Remove removes the named file or (empty) directory.
  349. func (fs *S3Fs) Remove(name string, isDir bool) error {
  350. if isDir {
  351. hasContents, err := fs.hasContents(name)
  352. if err != nil {
  353. return err
  354. }
  355. if hasContents {
  356. return fmt.Errorf("cannot remove non empty directory: %q", name)
  357. }
  358. if !strings.HasSuffix(name, "/") {
  359. name += "/"
  360. }
  361. }
  362. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  363. defer cancelFn()
  364. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  365. Bucket: aws.String(fs.config.Bucket),
  366. Key: aws.String(name),
  367. })
  368. metric.S3DeleteObjectCompleted(err)
  369. return err
  370. }
  371. // Mkdir creates a new directory with the specified name and default permissions
  372. func (fs *S3Fs) Mkdir(name string) error {
  373. _, err := fs.Stat(name)
  374. if !fs.IsNotExist(err) {
  375. return err
  376. }
  377. return fs.mkdirInternal(name)
  378. }
  379. // Symlink creates source as a symbolic link to target.
  380. func (*S3Fs) Symlink(_, _ string) error {
  381. return ErrVfsUnsupported
  382. }
  383. // Readlink returns the destination of the named symbolic link
  384. func (*S3Fs) Readlink(_ string) (string, error) {
  385. return "", ErrVfsUnsupported
  386. }
  387. // Chown changes the numeric uid and gid of the named file.
  388. func (*S3Fs) Chown(_ string, _ int, _ int) error {
  389. return ErrVfsUnsupported
  390. }
  391. // Chmod changes the mode of the named file to mode.
  392. func (*S3Fs) Chmod(_ string, _ os.FileMode) error {
  393. return ErrVfsUnsupported
  394. }
  395. // Chtimes changes the access and modification times of the named file.
  396. func (fs *S3Fs) Chtimes(_ string, _, _ time.Time, _ bool) error {
  397. return ErrVfsUnsupported
  398. }
  399. // Truncate changes the size of the named file.
  400. // Truncate by path is not supported, while truncating an opened
  401. // file is handled inside base transfer
  402. func (*S3Fs) Truncate(_ string, _ int64) error {
  403. return ErrVfsUnsupported
  404. }
  405. // ReadDir reads the directory named by dirname and returns
  406. // a list of directory entries.
  407. func (fs *S3Fs) ReadDir(dirname string) (DirLister, error) {
  408. // dirname must be already cleaned
  409. prefix := fs.getPrefix(dirname)
  410. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  411. Bucket: aws.String(fs.config.Bucket),
  412. Prefix: aws.String(prefix),
  413. Delimiter: aws.String("/"),
  414. MaxKeys: &s3DefaultPageSize,
  415. })
  416. return &s3DirLister{
  417. paginator: paginator,
  418. timeout: fs.ctxTimeout,
  419. prefix: prefix,
  420. prefixes: make(map[string]bool),
  421. }, nil
  422. }
  423. // IsUploadResumeSupported returns true if resuming uploads is supported.
  424. // Resuming uploads is not supported on S3
  425. func (*S3Fs) IsUploadResumeSupported() bool {
  426. return false
  427. }
  428. // IsConditionalUploadResumeSupported returns if resuming uploads is supported
  429. // for the specified size
  430. func (*S3Fs) IsConditionalUploadResumeSupported(size int64) bool {
  431. return size <= resumeMaxSize
  432. }
  433. // IsAtomicUploadSupported returns true if atomic upload is supported.
  434. // S3 uploads are already atomic, we don't need to upload to a temporary
  435. // file
  436. func (*S3Fs) IsAtomicUploadSupported() bool {
  437. return false
  438. }
  439. // IsNotExist returns a boolean indicating whether the error is known to
  440. // report that a file or directory does not exist
  441. func (*S3Fs) IsNotExist(err error) bool {
  442. if err == nil {
  443. return false
  444. }
  445. var re *awshttp.ResponseError
  446. if errors.As(err, &re) {
  447. if re.Response != nil {
  448. return re.Response.StatusCode == http.StatusNotFound
  449. }
  450. }
  451. return false
  452. }
  453. // IsPermission returns a boolean indicating whether the error is known to
  454. // report that permission is denied.
  455. func (*S3Fs) IsPermission(err error) bool {
  456. if err == nil {
  457. return false
  458. }
  459. var re *awshttp.ResponseError
  460. if errors.As(err, &re) {
  461. if re.Response != nil {
  462. return re.Response.StatusCode == http.StatusForbidden ||
  463. re.Response.StatusCode == http.StatusUnauthorized
  464. }
  465. }
  466. return false
  467. }
  468. // IsNotSupported returns true if the error indicate an unsupported operation
  469. func (*S3Fs) IsNotSupported(err error) bool {
  470. if err == nil {
  471. return false
  472. }
  473. return errors.Is(err, ErrVfsUnsupported)
  474. }
  475. // CheckRootPath creates the specified local root directory if it does not exists
  476. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  477. // we need a local directory for temporary files
  478. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  479. return osFs.CheckRootPath(username, uid, gid)
  480. }
  481. // ScanRootDirContents returns the number of files contained in the bucket,
  482. // and their size
  483. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  484. return fs.GetDirSize(fs.config.KeyPrefix)
  485. }
  486. // GetDirSize returns the number of files and the size for a folder
  487. // including any subfolders
  488. func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
  489. prefix := fs.getPrefix(dirname)
  490. numFiles := 0
  491. size := int64(0)
  492. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  493. Bucket: aws.String(fs.config.Bucket),
  494. Prefix: aws.String(prefix),
  495. MaxKeys: &s3DefaultPageSize,
  496. })
  497. for paginator.HasMorePages() {
  498. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  499. defer cancelFn()
  500. page, err := paginator.NextPage(ctx)
  501. if err != nil {
  502. metric.S3ListObjectsCompleted(err)
  503. return numFiles, size, err
  504. }
  505. for _, fileObject := range page.Contents {
  506. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  507. objectSize := util.GetIntFromPointer(fileObject.Size)
  508. if isDir && objectSize == 0 {
  509. continue
  510. }
  511. numFiles++
  512. size += objectSize
  513. }
  514. fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size)
  515. }
  516. metric.S3ListObjectsCompleted(nil)
  517. return numFiles, size, nil
  518. }
  519. // GetAtomicUploadPath returns the path to use for an atomic upload.
  520. // S3 uploads are already atomic, we never call this method for S3
  521. func (*S3Fs) GetAtomicUploadPath(_ string) string {
  522. return ""
  523. }
  524. // GetRelativePath returns the path for a file relative to the user's home dir.
  525. // This is the path as seen by SFTPGo users
  526. func (fs *S3Fs) GetRelativePath(name string) string {
  527. rel := path.Clean(name)
  528. if rel == "." {
  529. rel = ""
  530. }
  531. if !path.IsAbs(rel) {
  532. rel = "/" + rel
  533. }
  534. if fs.config.KeyPrefix != "" {
  535. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  536. rel = "/"
  537. }
  538. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  539. }
  540. if fs.mountPath != "" {
  541. rel = path.Join(fs.mountPath, rel)
  542. }
  543. return rel
  544. }
  545. // Walk walks the file tree rooted at root, calling walkFn for each file or
  546. // directory in the tree, including root. The result are unordered
  547. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  548. prefix := fs.getPrefix(root)
  549. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  550. Bucket: aws.String(fs.config.Bucket),
  551. Prefix: aws.String(prefix),
  552. MaxKeys: &s3DefaultPageSize,
  553. })
  554. for paginator.HasMorePages() {
  555. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  556. defer cancelFn()
  557. page, err := paginator.NextPage(ctx)
  558. if err != nil {
  559. metric.S3ListObjectsCompleted(err)
  560. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  561. return err
  562. }
  563. for _, fileObject := range page.Contents {
  564. name, isDir := fs.resolve(fileObject.Key, prefix)
  565. if name == "" {
  566. continue
  567. }
  568. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  569. NewFileInfo(name, isDir, util.GetIntFromPointer(fileObject.Size),
  570. util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  571. if err != nil {
  572. return err
  573. }
  574. }
  575. }
  576. metric.S3ListObjectsCompleted(nil)
  577. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  578. return nil
  579. }
  580. // Join joins any number of path elements into a single path
  581. func (*S3Fs) Join(elem ...string) string {
  582. return strings.TrimPrefix(path.Join(elem...), "/")
  583. }
  584. // HasVirtualFolders returns true if folders are emulated
  585. func (*S3Fs) HasVirtualFolders() bool {
  586. return true
  587. }
  588. // ResolvePath returns the matching filesystem path for the specified virtual path
  589. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  590. if fs.mountPath != "" {
  591. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  592. }
  593. if !path.IsAbs(virtualPath) {
  594. virtualPath = path.Clean("/" + virtualPath)
  595. }
  596. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  597. }
  598. // CopyFile implements the FsFileCopier interface
  599. func (fs *S3Fs) CopyFile(source, target string, srcInfo os.FileInfo) (int, int64, error) {
  600. numFiles := 1
  601. sizeDiff := srcInfo.Size()
  602. attrs, err := fs.headObject(target)
  603. if err == nil {
  604. sizeDiff -= util.GetIntFromPointer(attrs.ContentLength)
  605. numFiles = 0
  606. } else {
  607. if !fs.IsNotExist(err) {
  608. return 0, 0, err
  609. }
  610. }
  611. if err := fs.copyFileInternal(source, target, srcInfo); err != nil {
  612. return 0, 0, err
  613. }
  614. return numFiles, sizeDiff, nil
  615. }
  616. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  617. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  618. isDir := strings.HasSuffix(result, "/")
  619. if isDir {
  620. result = strings.TrimSuffix(result, "/")
  621. }
  622. return result, isDir
  623. }
  624. func (fs *S3Fs) setConfigDefaults() {
  625. if fs.config.UploadPartSize == 0 {
  626. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  627. } else {
  628. if fs.config.UploadPartSize < 1024*1024 {
  629. fs.config.UploadPartSize *= 1024 * 1024
  630. }
  631. }
  632. if fs.config.UploadConcurrency == 0 {
  633. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  634. }
  635. if fs.config.DownloadPartSize == 0 {
  636. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  637. } else {
  638. if fs.config.DownloadPartSize < 1024*1024 {
  639. fs.config.DownloadPartSize *= 1024 * 1024
  640. }
  641. }
  642. if fs.config.DownloadConcurrency == 0 {
  643. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  644. }
  645. }
  646. func (fs *S3Fs) copyFileInternal(source, target string, srcInfo os.FileInfo) error {
  647. contentType := mime.TypeByExtension(path.Ext(source))
  648. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  649. if srcInfo.Size() > s3CopyObjectThreshold {
  650. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  651. source, srcInfo.Size())
  652. err := fs.doMultipartCopy(copySource, target, contentType, srcInfo.Size())
  653. metric.S3CopyObjectCompleted(err)
  654. return err
  655. }
  656. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  657. defer cancelFn()
  658. copyObject := &s3.CopyObjectInput{
  659. Bucket: aws.String(fs.config.Bucket),
  660. CopySource: aws.String(copySource),
  661. Key: aws.String(target),
  662. StorageClass: types.StorageClass(fs.config.StorageClass),
  663. ACL: types.ObjectCannedACL(fs.config.ACL),
  664. ContentType: util.NilIfEmpty(contentType),
  665. CopySourceSSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  666. CopySourceSSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  667. CopySourceSSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  668. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  669. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  670. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  671. }
  672. _, err := fs.svc.CopyObject(ctx, copyObject)
  673. metric.S3CopyObjectCompleted(err)
  674. return err
  675. }
  676. func (fs *S3Fs) renameInternal(source, target string, srcInfo os.FileInfo, recursion int,
  677. updateModTime bool,
  678. ) (int, int64, error) {
  679. var numFiles int
  680. var filesSize int64
  681. if srcInfo.IsDir() {
  682. if renameMode == 0 {
  683. hasContents, err := fs.hasContents(source)
  684. if err != nil {
  685. return numFiles, filesSize, err
  686. }
  687. if hasContents {
  688. return numFiles, filesSize, fmt.Errorf("%w: cannot rename non empty directory: %q", ErrVfsUnsupported, source)
  689. }
  690. }
  691. if err := fs.mkdirInternal(target); err != nil {
  692. return numFiles, filesSize, err
  693. }
  694. if renameMode == 1 {
  695. files, size, err := doRecursiveRename(fs, source, target, fs.renameInternal, recursion, updateModTime)
  696. numFiles += files
  697. filesSize += size
  698. if err != nil {
  699. return numFiles, filesSize, err
  700. }
  701. }
  702. } else {
  703. if err := fs.copyFileInternal(source, target, srcInfo); err != nil {
  704. return numFiles, filesSize, err
  705. }
  706. numFiles++
  707. filesSize += srcInfo.Size()
  708. }
  709. err := fs.Remove(source, srcInfo.IsDir())
  710. if fs.IsNotExist(err) {
  711. err = nil
  712. }
  713. return numFiles, filesSize, err
  714. }
  715. func (fs *S3Fs) mkdirInternal(name string) error {
  716. if !strings.HasSuffix(name, "/") {
  717. name += "/"
  718. }
  719. _, w, _, err := fs.Create(name, -1, 0)
  720. if err != nil {
  721. return err
  722. }
  723. return w.Close()
  724. }
  725. func (fs *S3Fs) hasContents(name string) (bool, error) {
  726. prefix := fs.getPrefix(name)
  727. maxKeys := int32(2)
  728. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  729. Bucket: aws.String(fs.config.Bucket),
  730. Prefix: aws.String(prefix),
  731. MaxKeys: &maxKeys,
  732. })
  733. if paginator.HasMorePages() {
  734. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  735. defer cancelFn()
  736. page, err := paginator.NextPage(ctx)
  737. metric.S3ListObjectsCompleted(err)
  738. if err != nil {
  739. return false, err
  740. }
  741. for _, obj := range page.Contents {
  742. name, _ := fs.resolve(obj.Key, prefix)
  743. if name == "" || name == "/" {
  744. continue
  745. }
  746. return true, nil
  747. }
  748. return false, nil
  749. }
  750. metric.S3ListObjectsCompleted(nil)
  751. return false, nil
  752. }
  753. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  754. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  755. defer cancelFn()
  756. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  757. Bucket: aws.String(fs.config.Bucket),
  758. Key: aws.String(target),
  759. StorageClass: types.StorageClass(fs.config.StorageClass),
  760. ACL: types.ObjectCannedACL(fs.config.ACL),
  761. ContentType: util.NilIfEmpty(contentType),
  762. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  763. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  764. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  765. })
  766. if err != nil {
  767. return fmt.Errorf("unable to create multipart copy request: %w", err)
  768. }
  769. uploadID := util.GetStringFromPointer(res.UploadId)
  770. if uploadID == "" {
  771. return errors.New("unable to get multipart copy upload ID")
  772. }
  773. // We use 32 MB part size and copy 10 parts in parallel.
  774. // These values are arbitrary. We don't want to start too many goroutines
  775. maxPartSize := int64(32 * 1024 * 1024)
  776. if fileSize > int64(100*1024*1024*1024) {
  777. maxPartSize = int64(500 * 1024 * 1024)
  778. }
  779. guard := make(chan struct{}, 10)
  780. finished := false
  781. var completedParts []types.CompletedPart
  782. var partMutex sync.Mutex
  783. var wg sync.WaitGroup
  784. var hasError atomic.Bool
  785. var errOnce sync.Once
  786. var copyError error
  787. var partNumber int32
  788. var offset int64
  789. opCtx, opCancel := context.WithCancel(context.Background())
  790. defer opCancel()
  791. for partNumber = 1; !finished; partNumber++ {
  792. start := offset
  793. end := offset + maxPartSize
  794. if end >= fileSize {
  795. end = fileSize
  796. finished = true
  797. }
  798. offset = end
  799. guard <- struct{}{}
  800. if hasError.Load() {
  801. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  802. break
  803. }
  804. wg.Add(1)
  805. go func(partNum int32, partStart, partEnd int64) {
  806. defer func() {
  807. <-guard
  808. wg.Done()
  809. }()
  810. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  811. defer innerCancelFn()
  812. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  813. Bucket: aws.String(fs.config.Bucket),
  814. CopySource: aws.String(source),
  815. Key: aws.String(target),
  816. PartNumber: &partNum,
  817. UploadId: aws.String(uploadID),
  818. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  819. CopySourceSSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  820. CopySourceSSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  821. CopySourceSSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  822. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  823. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  824. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  825. })
  826. if err != nil {
  827. errOnce.Do(func() {
  828. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  829. hasError.Store(true)
  830. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  831. opCancel()
  832. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  833. defer abortCancelFn()
  834. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  835. Bucket: aws.String(fs.config.Bucket),
  836. Key: aws.String(target),
  837. UploadId: aws.String(uploadID),
  838. })
  839. if errAbort != nil {
  840. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  841. }
  842. })
  843. return
  844. }
  845. partMutex.Lock()
  846. completedParts = append(completedParts, types.CompletedPart{
  847. ETag: partResp.CopyPartResult.ETag,
  848. PartNumber: &partNum,
  849. })
  850. partMutex.Unlock()
  851. }(partNumber, start, end)
  852. }
  853. wg.Wait()
  854. close(guard)
  855. if copyError != nil {
  856. return copyError
  857. }
  858. sort.Slice(completedParts, func(i, j int) bool {
  859. getPartNumber := func(number *int32) int32 {
  860. if number == nil {
  861. return 0
  862. }
  863. return *number
  864. }
  865. return getPartNumber(completedParts[i].PartNumber) < getPartNumber(completedParts[j].PartNumber)
  866. })
  867. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  868. defer completeCancelFn()
  869. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  870. Bucket: aws.String(fs.config.Bucket),
  871. Key: aws.String(target),
  872. UploadId: aws.String(uploadID),
  873. MultipartUpload: &types.CompletedMultipartUpload{
  874. Parts: completedParts,
  875. },
  876. })
  877. if err != nil {
  878. return fmt.Errorf("unable to complete multipart upload: %w", err)
  879. }
  880. return nil
  881. }
  882. func (fs *S3Fs) getPrefix(name string) string {
  883. prefix := ""
  884. if name != "" && name != "." && name != "/" {
  885. prefix = strings.TrimPrefix(name, "/")
  886. if !strings.HasSuffix(prefix, "/") {
  887. prefix += "/"
  888. }
  889. }
  890. return prefix
  891. }
  892. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  893. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  894. defer cancelFn()
  895. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  896. Bucket: aws.String(fs.config.Bucket),
  897. Key: aws.String(name),
  898. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  899. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  900. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  901. })
  902. metric.S3HeadObjectCompleted(err)
  903. return obj, err
  904. }
  905. // GetMimeType returns the content type
  906. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  907. obj, err := fs.headObject(name)
  908. if err != nil {
  909. return "", err
  910. }
  911. return util.GetStringFromPointer(obj.ContentType), nil
  912. }
  913. // Close closes the fs
  914. func (*S3Fs) Close() error {
  915. return nil
  916. }
  917. // GetAvailableDiskSize returns the available size for the specified path
  918. func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  919. return nil, ErrStorageSizeUnavailable
  920. }
  921. func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) (int64, error) {
  922. fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
  923. ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
  924. defer cancelFn()
  925. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  926. d.Concurrency = fs.config.DownloadConcurrency
  927. d.PartSize = fs.config.DownloadPartSize
  928. if fs.config.DownloadPartMaxTime > 0 {
  929. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  930. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  931. fs.config.SkipTLSVerify)
  932. })
  933. }
  934. })
  935. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  936. Bucket: aws.String(fs.config.Bucket),
  937. Key: aws.String(name),
  938. SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
  939. SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
  940. SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
  941. })
  942. fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
  943. name, n, err)
  944. metric.S3TransferCompleted(n, 1, err)
  945. return n, err
  946. }
  947. type s3DirLister struct {
  948. baseDirLister
  949. paginator *s3.ListObjectsV2Paginator
  950. timeout time.Duration
  951. prefix string
  952. prefixes map[string]bool
  953. metricUpdated bool
  954. }
  955. func (l *s3DirLister) resolve(name *string) (string, bool) {
  956. result := strings.TrimPrefix(util.GetStringFromPointer(name), l.prefix)
  957. isDir := strings.HasSuffix(result, "/")
  958. if isDir {
  959. result = strings.TrimSuffix(result, "/")
  960. }
  961. return result, isDir
  962. }
  963. func (l *s3DirLister) Next(limit int) ([]os.FileInfo, error) {
  964. if limit <= 0 {
  965. return nil, errInvalidDirListerLimit
  966. }
  967. if len(l.cache) >= limit {
  968. return l.returnFromCache(limit), nil
  969. }
  970. if !l.paginator.HasMorePages() {
  971. if !l.metricUpdated {
  972. l.metricUpdated = true
  973. metric.S3ListObjectsCompleted(nil)
  974. }
  975. return l.returnFromCache(limit), io.EOF
  976. }
  977. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(l.timeout))
  978. defer cancelFn()
  979. page, err := l.paginator.NextPage(ctx)
  980. if err != nil {
  981. metric.S3ListObjectsCompleted(err)
  982. return l.cache, err
  983. }
  984. for _, p := range page.CommonPrefixes {
  985. // prefixes have a trailing slash
  986. name, _ := l.resolve(p.Prefix)
  987. if name == "" {
  988. continue
  989. }
  990. if _, ok := l.prefixes[name]; ok {
  991. continue
  992. }
  993. l.cache = append(l.cache, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  994. l.prefixes[name] = true
  995. }
  996. for _, fileObject := range page.Contents {
  997. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  998. objectSize := util.GetIntFromPointer(fileObject.Size)
  999. name, isDir := l.resolve(fileObject.Key)
  1000. if name == "" || name == "/" {
  1001. continue
  1002. }
  1003. if isDir {
  1004. if _, ok := l.prefixes[name]; ok {
  1005. continue
  1006. }
  1007. l.prefixes[name] = true
  1008. }
  1009. l.cache = append(l.cache, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  1010. }
  1011. return l.returnFromCache(limit), nil
  1012. }
  1013. func (l *s3DirLister) Close() error {
  1014. return l.baseDirLister.Close()
  1015. }
  1016. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration, skipTLSVerify bool) *awshttp.BuildableClient {
  1017. c := awshttp.NewBuildableClient().
  1018. WithDialerOptions(func(d *net.Dialer) {
  1019. d.Timeout = 8 * time.Second
  1020. }).
  1021. WithTransportOptions(func(tr *http.Transport) {
  1022. tr.IdleConnTimeout = idleConnectionTimeout
  1023. tr.WriteBufferSize = s3TransferBufferSize
  1024. tr.ReadBufferSize = s3TransferBufferSize
  1025. if skipTLSVerify {
  1026. if tr.TLSClientConfig != nil {
  1027. tr.TLSClientConfig.InsecureSkipVerify = skipTLSVerify
  1028. } else {
  1029. tr.TLSClientConfig = &tls.Config{
  1030. MinVersion: awshttp.DefaultHTTPTransportTLSMinVersion,
  1031. InsecureSkipVerify: skipTLSVerify,
  1032. }
  1033. }
  1034. }
  1035. })
  1036. if timeout > 0 {
  1037. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  1038. }
  1039. return c
  1040. }
  1041. // ideally we should simply use url.PathEscape:
  1042. //
  1043. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  1044. //
  1045. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  1046. func pathEscape(in string) string {
  1047. var u url.URL
  1048. u.Path = in
  1049. return strings.ReplaceAll(u.String(), "+", "%2B")
  1050. }