s3fs.go 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "mime"
  22. "net"
  23. "net/http"
  24. "net/url"
  25. "os"
  26. "path"
  27. "path/filepath"
  28. "sort"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/aws/aws-sdk-go-v2/aws"
  34. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  35. "github.com/aws/aws-sdk-go-v2/config"
  36. "github.com/aws/aws-sdk-go-v2/credentials"
  37. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  38. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  39. "github.com/aws/aws-sdk-go-v2/service/s3"
  40. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  41. "github.com/aws/aws-sdk-go-v2/service/sts"
  42. "github.com/eikenb/pipeat"
  43. "github.com/pkg/sftp"
  44. "github.com/drakkan/sftpgo/v2/internal/logger"
  45. "github.com/drakkan/sftpgo/v2/internal/metric"
  46. "github.com/drakkan/sftpgo/v2/internal/plugin"
  47. "github.com/drakkan/sftpgo/v2/internal/util"
  48. "github.com/drakkan/sftpgo/v2/internal/version"
  49. )
  50. const (
  51. // using this mime type for directories improves compatibility with s3fs-fuse
  52. s3DirMimeType = "application/x-directory"
  53. s3TransferBufferSize = 256 * 1024
  54. )
  55. var (
  56. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  57. )
  58. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  59. type S3Fs struct {
  60. connectionID string
  61. localTempDir string
  62. // if not empty this fs is mouted as virtual folder in the specified path
  63. mountPath string
  64. config *S3FsConfig
  65. svc *s3.Client
  66. ctxTimeout time.Duration
  67. }
  68. func init() {
  69. version.AddFeature("+s3")
  70. }
  71. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  72. // object storage
  73. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  74. if localTempDir == "" {
  75. if tempPath != "" {
  76. localTempDir = tempPath
  77. } else {
  78. localTempDir = filepath.Clean(os.TempDir())
  79. }
  80. }
  81. fs := &S3Fs{
  82. connectionID: connectionID,
  83. localTempDir: localTempDir,
  84. mountPath: getMountPath(mountPath),
  85. config: &s3Config,
  86. ctxTimeout: 30 * time.Second,
  87. }
  88. if err := fs.config.validate(); err != nil {
  89. return fs, err
  90. }
  91. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  92. defer cancel()
  93. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(getAWSHTTPClient(0, 30*time.Second)))
  94. if err != nil {
  95. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  96. }
  97. if fs.config.Region != "" {
  98. awsConfig.Region = fs.config.Region
  99. }
  100. if !fs.config.AccessSecret.IsEmpty() {
  101. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  102. return fs, err
  103. }
  104. awsConfig.Credentials = aws.NewCredentialsCache(
  105. credentials.NewStaticCredentialsProvider(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), ""))
  106. }
  107. fs.setConfigDefaults()
  108. if fs.config.RoleARN != "" {
  109. client := sts.NewFromConfig(awsConfig)
  110. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  111. awsConfig.Credentials = creds
  112. }
  113. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  114. o.UsePathStyle = fs.config.ForcePathStyle
  115. if fs.config.Endpoint != "" {
  116. o.BaseEndpoint = aws.String(fs.config.Endpoint)
  117. }
  118. })
  119. return fs, nil
  120. }
  121. // Name returns the name for the Fs implementation
  122. func (fs *S3Fs) Name() string {
  123. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  124. }
  125. // ConnectionID returns the connection ID associated to this Fs implementation
  126. func (fs *S3Fs) ConnectionID() string {
  127. return fs.connectionID
  128. }
  129. // Stat returns a FileInfo describing the named file
  130. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  131. var result *FileInfo
  132. if name == "" || name == "/" || name == "." {
  133. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  134. }
  135. if fs.config.KeyPrefix == name+"/" {
  136. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  137. }
  138. obj, err := fs.headObject(name)
  139. if err == nil {
  140. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  141. // So we check some common content types to detect if this is a "directory".
  142. isDir := util.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  143. if util.GetIntFromPointer(obj.ContentLength) == 0 && !isDir {
  144. _, err = fs.headObject(name + "/")
  145. isDir = err == nil
  146. }
  147. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir, util.GetIntFromPointer(obj.ContentLength),
  148. util.GetTimeFromPointer(obj.LastModified), false))
  149. }
  150. if !fs.IsNotExist(err) {
  151. return result, err
  152. }
  153. // now check if this is a prefix (virtual directory)
  154. hasContents, err := fs.hasContents(name)
  155. if err == nil && hasContents {
  156. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  157. } else if err != nil {
  158. return nil, err
  159. }
  160. // the requested file may still be a directory as a zero bytes key
  161. // with a trailing forward slash (created using mkdir).
  162. // S3 doesn't return content type when listing objects, so we have
  163. // create "dirs" adding a trailing "/" to the key
  164. return fs.getStatForDir(name)
  165. }
  166. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  167. var result *FileInfo
  168. obj, err := fs.headObject(name + "/")
  169. if err != nil {
  170. return result, err
  171. }
  172. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, util.GetIntFromPointer(obj.ContentLength),
  173. util.GetTimeFromPointer(obj.LastModified), false))
  174. }
  175. // Lstat returns a FileInfo describing the named file
  176. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  177. return fs.Stat(name)
  178. }
  179. // Open opens the named file for reading
  180. func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  181. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  182. if err != nil {
  183. return nil, nil, nil, err
  184. }
  185. ctx, cancelFn := context.WithCancel(context.Background())
  186. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  187. d.Concurrency = fs.config.DownloadConcurrency
  188. d.PartSize = fs.config.DownloadPartSize
  189. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  190. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  191. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond)
  192. })
  193. }
  194. })
  195. var streamRange *string
  196. if offset > 0 {
  197. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  198. }
  199. go func() {
  200. defer cancelFn()
  201. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  202. Bucket: aws.String(fs.config.Bucket),
  203. Key: aws.String(name),
  204. Range: streamRange,
  205. })
  206. w.CloseWithError(err) //nolint:errcheck
  207. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  208. metric.S3TransferCompleted(n, 1, err)
  209. }()
  210. return nil, r, cancelFn, nil
  211. }
  212. // Create creates or opens the named file for writing
  213. func (fs *S3Fs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) {
  214. if checks&CheckParentDir != 0 {
  215. _, err := fs.Stat(path.Dir(name))
  216. if err != nil {
  217. return nil, nil, nil, err
  218. }
  219. }
  220. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  221. if err != nil {
  222. return nil, nil, nil, err
  223. }
  224. p := NewPipeWriter(w)
  225. ctx, cancelFn := context.WithCancel(context.Background())
  226. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  227. u.Concurrency = fs.config.UploadConcurrency
  228. u.PartSize = fs.config.UploadPartSize
  229. if fs.config.UploadPartMaxTime > 0 {
  230. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  231. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond)
  232. })
  233. }
  234. })
  235. go func() {
  236. defer cancelFn()
  237. var contentType string
  238. if flag == -1 {
  239. contentType = s3DirMimeType
  240. } else {
  241. contentType = mime.TypeByExtension(path.Ext(name))
  242. }
  243. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  244. Bucket: aws.String(fs.config.Bucket),
  245. Key: aws.String(name),
  246. Body: r,
  247. ACL: types.ObjectCannedACL(fs.config.ACL),
  248. StorageClass: types.StorageClass(fs.config.StorageClass),
  249. ContentType: util.NilIfEmpty(contentType),
  250. })
  251. r.CloseWithError(err) //nolint:errcheck
  252. p.Done(err)
  253. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %v, err: %+v",
  254. name, fs.config.ACL, r.GetReadedBytes(), err)
  255. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  256. }()
  257. return nil, p, cancelFn, nil
  258. }
  259. // Rename renames (moves) source to target.
  260. func (fs *S3Fs) Rename(source, target string) (int, int64, error) {
  261. if source == target {
  262. return -1, -1, nil
  263. }
  264. _, err := fs.Stat(path.Dir(target))
  265. if err != nil {
  266. return -1, -1, err
  267. }
  268. fi, err := fs.Stat(source)
  269. if err != nil {
  270. return -1, -1, err
  271. }
  272. return fs.renameInternal(source, target, fi)
  273. }
  274. // Remove removes the named file or (empty) directory.
  275. func (fs *S3Fs) Remove(name string, isDir bool) error {
  276. if isDir {
  277. hasContents, err := fs.hasContents(name)
  278. if err != nil {
  279. return err
  280. }
  281. if hasContents {
  282. return fmt.Errorf("cannot remove non empty directory: %q", name)
  283. }
  284. if !strings.HasSuffix(name, "/") {
  285. name += "/"
  286. }
  287. }
  288. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  289. defer cancelFn()
  290. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  291. Bucket: aws.String(fs.config.Bucket),
  292. Key: aws.String(name),
  293. })
  294. metric.S3DeleteObjectCompleted(err)
  295. if plugin.Handler.HasMetadater() && err == nil && !isDir {
  296. if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
  297. fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %q: %+v", name, errMetadata)
  298. }
  299. }
  300. return err
  301. }
  302. // Mkdir creates a new directory with the specified name and default permissions
  303. func (fs *S3Fs) Mkdir(name string) error {
  304. _, err := fs.Stat(name)
  305. if !fs.IsNotExist(err) {
  306. return err
  307. }
  308. return fs.mkdirInternal(name)
  309. }
  310. // Symlink creates source as a symbolic link to target.
  311. func (*S3Fs) Symlink(_, _ string) error {
  312. return ErrVfsUnsupported
  313. }
  314. // Readlink returns the destination of the named symbolic link
  315. func (*S3Fs) Readlink(_ string) (string, error) {
  316. return "", ErrVfsUnsupported
  317. }
  318. // Chown changes the numeric uid and gid of the named file.
  319. func (*S3Fs) Chown(_ string, _ int, _ int) error {
  320. return ErrVfsUnsupported
  321. }
  322. // Chmod changes the mode of the named file to mode.
  323. func (*S3Fs) Chmod(_ string, _ os.FileMode) error {
  324. return ErrVfsUnsupported
  325. }
  326. // Chtimes changes the access and modification times of the named file.
  327. func (fs *S3Fs) Chtimes(name string, _, mtime time.Time, isUploading bool) error {
  328. if !plugin.Handler.HasMetadater() {
  329. return ErrVfsUnsupported
  330. }
  331. if !isUploading {
  332. info, err := fs.Stat(name)
  333. if err != nil {
  334. return err
  335. }
  336. if info.IsDir() {
  337. return ErrVfsUnsupported
  338. }
  339. }
  340. return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
  341. util.GetTimeAsMsSinceEpoch(mtime))
  342. }
  343. // Truncate changes the size of the named file.
  344. // Truncate by path is not supported, while truncating an opened
  345. // file is handled inside base transfer
  346. func (*S3Fs) Truncate(_ string, _ int64) error {
  347. return ErrVfsUnsupported
  348. }
  349. // ReadDir reads the directory named by dirname and returns
  350. // a list of directory entries.
  351. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  352. var result []os.FileInfo
  353. // dirname must be already cleaned
  354. prefix := fs.getPrefix(dirname)
  355. modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
  356. if err != nil {
  357. return result, err
  358. }
  359. prefixes := make(map[string]bool)
  360. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  361. Bucket: aws.String(fs.config.Bucket),
  362. Prefix: aws.String(prefix),
  363. Delimiter: aws.String("/"),
  364. })
  365. for paginator.HasMorePages() {
  366. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  367. defer cancelFn()
  368. page, err := paginator.NextPage(ctx)
  369. if err != nil {
  370. metric.S3ListObjectsCompleted(err)
  371. return result, err
  372. }
  373. for _, p := range page.CommonPrefixes {
  374. // prefixes have a trailing slash
  375. name, _ := fs.resolve(p.Prefix, prefix)
  376. if name == "" {
  377. continue
  378. }
  379. if _, ok := prefixes[name]; ok {
  380. continue
  381. }
  382. result = append(result, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  383. prefixes[name] = true
  384. }
  385. for _, fileObject := range page.Contents {
  386. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  387. objectSize := util.GetIntFromPointer(fileObject.Size)
  388. name, isDir := fs.resolve(fileObject.Key, prefix)
  389. if name == "" || name == "/" {
  390. continue
  391. }
  392. if isDir {
  393. if _, ok := prefixes[name]; ok {
  394. continue
  395. }
  396. prefixes[name] = true
  397. }
  398. if t, ok := modTimes[name]; ok {
  399. objectModTime = util.GetTimeFromMsecSinceEpoch(t)
  400. }
  401. result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize,
  402. objectModTime, false))
  403. }
  404. }
  405. metric.S3ListObjectsCompleted(nil)
  406. return result, nil
  407. }
  408. // IsUploadResumeSupported returns true if resuming uploads is supported.
  409. // Resuming uploads is not supported on S3
  410. func (*S3Fs) IsUploadResumeSupported() bool {
  411. return false
  412. }
  413. // IsAtomicUploadSupported returns true if atomic upload is supported.
  414. // S3 uploads are already atomic, we don't need to upload to a temporary
  415. // file
  416. func (*S3Fs) IsAtomicUploadSupported() bool {
  417. return false
  418. }
  419. // IsNotExist returns a boolean indicating whether the error is known to
  420. // report that a file or directory does not exist
  421. func (*S3Fs) IsNotExist(err error) bool {
  422. if err == nil {
  423. return false
  424. }
  425. var re *awshttp.ResponseError
  426. if errors.As(err, &re) {
  427. if re.Response != nil {
  428. return re.Response.StatusCode == http.StatusNotFound
  429. }
  430. }
  431. return false
  432. }
  433. // IsPermission returns a boolean indicating whether the error is known to
  434. // report that permission is denied.
  435. func (*S3Fs) IsPermission(err error) bool {
  436. if err == nil {
  437. return false
  438. }
  439. var re *awshttp.ResponseError
  440. if errors.As(err, &re) {
  441. if re.Response != nil {
  442. return re.Response.StatusCode == http.StatusForbidden ||
  443. re.Response.StatusCode == http.StatusUnauthorized
  444. }
  445. }
  446. return false
  447. }
  448. // IsNotSupported returns true if the error indicate an unsupported operation
  449. func (*S3Fs) IsNotSupported(err error) bool {
  450. if err == nil {
  451. return false
  452. }
  453. return err == ErrVfsUnsupported
  454. }
  455. // CheckRootPath creates the specified local root directory if it does not exists
  456. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  457. // we need a local directory for temporary files
  458. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  459. return osFs.CheckRootPath(username, uid, gid)
  460. }
  461. // ScanRootDirContents returns the number of files contained in the bucket,
  462. // and their size
  463. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  464. return fs.GetDirSize(fs.config.KeyPrefix)
  465. }
  466. func (fs *S3Fs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
  467. fileNames := make(map[string]bool)
  468. prefix := ""
  469. if fsPrefix != "/" {
  470. prefix = strings.TrimPrefix(fsPrefix, "/")
  471. }
  472. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  473. Bucket: aws.String(fs.config.Bucket),
  474. Prefix: aws.String(prefix),
  475. Delimiter: aws.String("/"),
  476. })
  477. for paginator.HasMorePages() {
  478. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  479. defer cancelFn()
  480. page, err := paginator.NextPage(ctx)
  481. if err != nil {
  482. metric.S3ListObjectsCompleted(err)
  483. if err != nil {
  484. fsLog(fs, logger.LevelError, "unable to get content for prefix %q: %+v", prefix, err)
  485. return nil, err
  486. }
  487. return fileNames, err
  488. }
  489. for _, fileObject := range page.Contents {
  490. name, isDir := fs.resolve(fileObject.Key, prefix)
  491. if name != "" && !isDir {
  492. fileNames[name] = true
  493. }
  494. }
  495. }
  496. metric.S3ListObjectsCompleted(nil)
  497. return fileNames, nil
  498. }
  499. // CheckMetadata checks the metadata consistency
  500. func (fs *S3Fs) CheckMetadata() error {
  501. return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
  502. }
  503. // GetDirSize returns the number of files and the size for a folder
  504. // including any subfolders
  505. func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
  506. prefix := fs.getPrefix(dirname)
  507. numFiles := 0
  508. size := int64(0)
  509. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  510. Bucket: aws.String(fs.config.Bucket),
  511. Prefix: aws.String(prefix),
  512. })
  513. for paginator.HasMorePages() {
  514. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  515. defer cancelFn()
  516. page, err := paginator.NextPage(ctx)
  517. if err != nil {
  518. metric.S3ListObjectsCompleted(err)
  519. return numFiles, size, err
  520. }
  521. for _, fileObject := range page.Contents {
  522. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  523. objectSize := util.GetIntFromPointer(fileObject.Size)
  524. if isDir && objectSize == 0 {
  525. continue
  526. }
  527. numFiles++
  528. size += objectSize
  529. if numFiles%1000 == 0 {
  530. fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size)
  531. }
  532. }
  533. }
  534. metric.S3ListObjectsCompleted(nil)
  535. return numFiles, size, nil
  536. }
  537. // GetAtomicUploadPath returns the path to use for an atomic upload.
  538. // S3 uploads are already atomic, we never call this method for S3
  539. func (*S3Fs) GetAtomicUploadPath(_ string) string {
  540. return ""
  541. }
  542. // GetRelativePath returns the path for a file relative to the user's home dir.
  543. // This is the path as seen by SFTPGo users
  544. func (fs *S3Fs) GetRelativePath(name string) string {
  545. rel := path.Clean(name)
  546. if rel == "." {
  547. rel = ""
  548. }
  549. if !path.IsAbs(rel) {
  550. rel = "/" + rel
  551. }
  552. if fs.config.KeyPrefix != "" {
  553. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  554. rel = "/"
  555. }
  556. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  557. }
  558. if fs.mountPath != "" {
  559. rel = path.Join(fs.mountPath, rel)
  560. }
  561. return rel
  562. }
  563. // Walk walks the file tree rooted at root, calling walkFn for each file or
  564. // directory in the tree, including root. The result are unordered
  565. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  566. prefix := fs.getPrefix(root)
  567. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  568. Bucket: aws.String(fs.config.Bucket),
  569. Prefix: aws.String(prefix),
  570. })
  571. for paginator.HasMorePages() {
  572. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  573. defer cancelFn()
  574. page, err := paginator.NextPage(ctx)
  575. if err != nil {
  576. metric.S3ListObjectsCompleted(err)
  577. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  578. return err
  579. }
  580. for _, fileObject := range page.Contents {
  581. name, isDir := fs.resolve(fileObject.Key, prefix)
  582. if name == "" {
  583. continue
  584. }
  585. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  586. NewFileInfo(name, isDir, util.GetIntFromPointer(fileObject.Size),
  587. util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  588. if err != nil {
  589. return err
  590. }
  591. }
  592. }
  593. metric.S3ListObjectsCompleted(nil)
  594. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  595. return nil
  596. }
  597. // Join joins any number of path elements into a single path
  598. func (*S3Fs) Join(elem ...string) string {
  599. return strings.TrimPrefix(path.Join(elem...), "/")
  600. }
  601. // HasVirtualFolders returns true if folders are emulated
  602. func (*S3Fs) HasVirtualFolders() bool {
  603. return true
  604. }
  605. // ResolvePath returns the matching filesystem path for the specified virtual path
  606. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  607. if fs.mountPath != "" {
  608. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  609. }
  610. if !path.IsAbs(virtualPath) {
  611. virtualPath = path.Clean("/" + virtualPath)
  612. }
  613. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  614. }
  615. // CopyFile implements the FsFileCopier interface
  616. func (fs *S3Fs) CopyFile(source, target string, srcSize int64) error {
  617. return fs.copyFileInternal(source, target, srcSize)
  618. }
  619. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  620. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  621. isDir := strings.HasSuffix(result, "/")
  622. if isDir {
  623. result = strings.TrimSuffix(result, "/")
  624. }
  625. return result, isDir
  626. }
  627. func (fs *S3Fs) setConfigDefaults() {
  628. if fs.config.UploadPartSize == 0 {
  629. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  630. } else {
  631. if fs.config.UploadPartSize < 1024*1024 {
  632. fs.config.UploadPartSize *= 1024 * 1024
  633. }
  634. }
  635. if fs.config.UploadConcurrency == 0 {
  636. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  637. }
  638. if fs.config.DownloadPartSize == 0 {
  639. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  640. } else {
  641. if fs.config.DownloadPartSize < 1024*1024 {
  642. fs.config.DownloadPartSize *= 1024 * 1024
  643. }
  644. }
  645. if fs.config.DownloadConcurrency == 0 {
  646. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  647. }
  648. }
  649. func (fs *S3Fs) copyFileInternal(source, target string, fileSize int64) error {
  650. contentType := mime.TypeByExtension(path.Ext(source))
  651. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  652. if fileSize > 500*1024*1024 {
  653. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  654. source, fileSize)
  655. err := fs.doMultipartCopy(copySource, target, contentType, fileSize)
  656. metric.S3CopyObjectCompleted(err)
  657. return err
  658. }
  659. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  660. defer cancelFn()
  661. _, err := fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  662. Bucket: aws.String(fs.config.Bucket),
  663. CopySource: aws.String(copySource),
  664. Key: aws.String(target),
  665. StorageClass: types.StorageClass(fs.config.StorageClass),
  666. ACL: types.ObjectCannedACL(fs.config.ACL),
  667. ContentType: util.NilIfEmpty(contentType),
  668. })
  669. metric.S3CopyObjectCompleted(err)
  670. return err
  671. }
  672. func (fs *S3Fs) renameInternal(source, target string, fi os.FileInfo) (int, int64, error) {
  673. var numFiles int
  674. var filesSize int64
  675. if fi.IsDir() {
  676. if renameMode == 0 {
  677. hasContents, err := fs.hasContents(source)
  678. if err != nil {
  679. return numFiles, filesSize, err
  680. }
  681. if hasContents {
  682. return numFiles, filesSize, fmt.Errorf("cannot rename non empty directory: %q", source)
  683. }
  684. }
  685. if err := fs.mkdirInternal(target); err != nil {
  686. return numFiles, filesSize, err
  687. }
  688. if renameMode == 1 {
  689. entries, err := fs.ReadDir(source)
  690. if err != nil {
  691. return numFiles, filesSize, err
  692. }
  693. for _, info := range entries {
  694. sourceEntry := fs.Join(source, info.Name())
  695. targetEntry := fs.Join(target, info.Name())
  696. files, size, err := fs.renameInternal(sourceEntry, targetEntry, info)
  697. if err != nil {
  698. if fs.IsNotExist(err) {
  699. fsLog(fs, logger.LevelInfo, "skipping rename for %q: %v", sourceEntry, err)
  700. continue
  701. }
  702. return numFiles, filesSize, err
  703. }
  704. numFiles += files
  705. filesSize += size
  706. }
  707. }
  708. } else {
  709. if err := fs.copyFileInternal(source, target, fi.Size()); err != nil {
  710. return numFiles, filesSize, err
  711. }
  712. numFiles++
  713. filesSize += fi.Size()
  714. if plugin.Handler.HasMetadater() {
  715. err := plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
  716. util.GetTimeAsMsSinceEpoch(fi.ModTime()))
  717. if err != nil {
  718. fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %q -> %q: %+v",
  719. source, target, err)
  720. }
  721. }
  722. }
  723. err := fs.Remove(source, fi.IsDir())
  724. if fs.IsNotExist(err) {
  725. err = nil
  726. }
  727. return numFiles, filesSize, err
  728. }
  729. func (fs *S3Fs) mkdirInternal(name string) error {
  730. if !strings.HasSuffix(name, "/") {
  731. name += "/"
  732. }
  733. _, w, _, err := fs.Create(name, -1, 0)
  734. if err != nil {
  735. return err
  736. }
  737. return w.Close()
  738. }
  739. func (fs *S3Fs) hasContents(name string) (bool, error) {
  740. prefix := fs.getPrefix(name)
  741. maxKeys := int32(2)
  742. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  743. Bucket: aws.String(fs.config.Bucket),
  744. Prefix: aws.String(prefix),
  745. MaxKeys: &maxKeys,
  746. })
  747. if paginator.HasMorePages() {
  748. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  749. defer cancelFn()
  750. page, err := paginator.NextPage(ctx)
  751. metric.S3ListObjectsCompleted(err)
  752. if err != nil {
  753. return false, err
  754. }
  755. for _, obj := range page.Contents {
  756. name, _ := fs.resolve(obj.Key, prefix)
  757. if name == "" || name == "/" {
  758. continue
  759. }
  760. return true, nil
  761. }
  762. return false, nil
  763. }
  764. metric.S3ListObjectsCompleted(nil)
  765. return false, nil
  766. }
  767. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  768. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  769. defer cancelFn()
  770. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  771. Bucket: aws.String(fs.config.Bucket),
  772. Key: aws.String(target),
  773. StorageClass: types.StorageClass(fs.config.StorageClass),
  774. ACL: types.ObjectCannedACL(fs.config.ACL),
  775. ContentType: util.NilIfEmpty(contentType),
  776. })
  777. if err != nil {
  778. return fmt.Errorf("unable to create multipart copy request: %w", err)
  779. }
  780. uploadID := util.GetStringFromPointer(res.UploadId)
  781. if uploadID == "" {
  782. return errors.New("unable to get multipart copy upload ID")
  783. }
  784. // We use 32 MB part size and copy 10 parts in parallel.
  785. // These values are arbitrary. We don't want to start too many goroutines
  786. maxPartSize := int64(32 * 1024 * 1024)
  787. if fileSize > int64(100*1024*1024*1024) {
  788. maxPartSize = int64(500 * 1024 * 1024)
  789. }
  790. guard := make(chan struct{}, 10)
  791. finished := false
  792. var completedParts []types.CompletedPart
  793. var partMutex sync.Mutex
  794. var wg sync.WaitGroup
  795. var hasError atomic.Bool
  796. var errOnce sync.Once
  797. var copyError error
  798. var partNumber int32
  799. var offset int64
  800. opCtx, opCancel := context.WithCancel(context.Background())
  801. defer opCancel()
  802. for partNumber = 1; !finished; partNumber++ {
  803. start := offset
  804. end := offset + maxPartSize
  805. if end >= fileSize {
  806. end = fileSize
  807. finished = true
  808. }
  809. offset = end
  810. guard <- struct{}{}
  811. if hasError.Load() {
  812. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  813. break
  814. }
  815. wg.Add(1)
  816. go func(partNum int32, partStart, partEnd int64) {
  817. defer func() {
  818. <-guard
  819. wg.Done()
  820. }()
  821. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  822. defer innerCancelFn()
  823. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  824. Bucket: aws.String(fs.config.Bucket),
  825. CopySource: aws.String(source),
  826. Key: aws.String(target),
  827. PartNumber: &partNum,
  828. UploadId: aws.String(uploadID),
  829. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  830. })
  831. if err != nil {
  832. errOnce.Do(func() {
  833. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  834. hasError.Store(true)
  835. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  836. opCancel()
  837. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  838. defer abortCancelFn()
  839. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  840. Bucket: aws.String(fs.config.Bucket),
  841. Key: aws.String(target),
  842. UploadId: aws.String(uploadID),
  843. })
  844. if errAbort != nil {
  845. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  846. }
  847. })
  848. return
  849. }
  850. partMutex.Lock()
  851. completedParts = append(completedParts, types.CompletedPart{
  852. ETag: partResp.CopyPartResult.ETag,
  853. PartNumber: &partNum,
  854. })
  855. partMutex.Unlock()
  856. }(partNumber, start, end)
  857. }
  858. wg.Wait()
  859. close(guard)
  860. if copyError != nil {
  861. return copyError
  862. }
  863. sort.Slice(completedParts, func(i, j int) bool {
  864. getPartNumber := func(number *int32) int32 {
  865. if number == nil {
  866. return 0
  867. }
  868. return *number
  869. }
  870. return getPartNumber(completedParts[i].PartNumber) < getPartNumber(completedParts[j].PartNumber)
  871. })
  872. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  873. defer completeCancelFn()
  874. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  875. Bucket: aws.String(fs.config.Bucket),
  876. Key: aws.String(target),
  877. UploadId: aws.String(uploadID),
  878. MultipartUpload: &types.CompletedMultipartUpload{
  879. Parts: completedParts,
  880. },
  881. })
  882. if err != nil {
  883. return fmt.Errorf("unable to complete multipart upload: %w", err)
  884. }
  885. return nil
  886. }
  887. func (fs *S3Fs) getPrefix(name string) string {
  888. prefix := ""
  889. if name != "" && name != "." && name != "/" {
  890. prefix = strings.TrimPrefix(name, "/")
  891. if !strings.HasSuffix(prefix, "/") {
  892. prefix += "/"
  893. }
  894. }
  895. return prefix
  896. }
  897. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  898. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  899. defer cancelFn()
  900. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  901. Bucket: aws.String(fs.config.Bucket),
  902. Key: aws.String(name),
  903. })
  904. metric.S3HeadObjectCompleted(err)
  905. return obj, err
  906. }
  907. // GetMimeType returns the content type
  908. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  909. obj, err := fs.headObject(name)
  910. if err != nil {
  911. return "", err
  912. }
  913. return util.GetStringFromPointer(obj.ContentType), nil
  914. }
  915. // Close closes the fs
  916. func (*S3Fs) Close() error {
  917. return nil
  918. }
  919. // GetAvailableDiskSize returns the available size for the specified path
  920. func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  921. return nil, ErrStorageSizeUnavailable
  922. }
  923. func (fs *S3Fs) getStorageID() string {
  924. if fs.config.Endpoint != "" {
  925. if !strings.HasSuffix(fs.config.Endpoint, "/") {
  926. return fmt.Sprintf("s3://%v/%v", fs.config.Endpoint, fs.config.Bucket)
  927. }
  928. return fmt.Sprintf("s3://%v%v", fs.config.Endpoint, fs.config.Bucket)
  929. }
  930. return fmt.Sprintf("s3://%v", fs.config.Bucket)
  931. }
  932. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration) *awshttp.BuildableClient {
  933. c := awshttp.NewBuildableClient().
  934. WithDialerOptions(func(d *net.Dialer) {
  935. d.Timeout = 8 * time.Second
  936. }).
  937. WithTransportOptions(func(tr *http.Transport) {
  938. tr.IdleConnTimeout = idleConnectionTimeout
  939. tr.WriteBufferSize = s3TransferBufferSize
  940. tr.ReadBufferSize = s3TransferBufferSize
  941. })
  942. if timeout > 0 {
  943. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  944. }
  945. return c
  946. }
  947. // ideally we should simply use url.PathEscape:
  948. //
  949. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  950. //
  951. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  952. func pathEscape(in string) string {
  953. var u url.URL
  954. u.Path = in
  955. return strings.ReplaceAll(u.String(), "+", "%2B")
  956. }