s3fs.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "mime"
  22. "net"
  23. "net/http"
  24. "net/url"
  25. "os"
  26. "path"
  27. "path/filepath"
  28. "sort"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/aws/aws-sdk-go-v2/aws"
  34. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  35. "github.com/aws/aws-sdk-go-v2/config"
  36. "github.com/aws/aws-sdk-go-v2/credentials"
  37. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  38. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  39. "github.com/aws/aws-sdk-go-v2/service/s3"
  40. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  41. "github.com/aws/aws-sdk-go-v2/service/sts"
  42. "github.com/eikenb/pipeat"
  43. "github.com/pkg/sftp"
  44. "github.com/drakkan/sftpgo/v2/logger"
  45. "github.com/drakkan/sftpgo/v2/metric"
  46. "github.com/drakkan/sftpgo/v2/plugin"
  47. "github.com/drakkan/sftpgo/v2/util"
  48. "github.com/drakkan/sftpgo/v2/version"
  49. )
  50. const (
  51. // using this mime type for directories improves compatibility with s3fs-fuse
  52. s3DirMimeType = "application/x-directory"
  53. s3TransferBufferSize = 256 * 1024
  54. )
  55. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  56. type S3Fs struct {
  57. connectionID string
  58. localTempDir string
  59. // if not empty this fs is mouted as virtual folder in the specified path
  60. mountPath string
  61. config *S3FsConfig
  62. svc *s3.Client
  63. ctxTimeout time.Duration
  64. }
  65. func init() {
  66. version.AddFeature("+s3")
  67. }
  68. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  69. // object storage
  70. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  71. if localTempDir == "" {
  72. if tempPath != "" {
  73. localTempDir = tempPath
  74. } else {
  75. localTempDir = filepath.Clean(os.TempDir())
  76. }
  77. }
  78. fs := &S3Fs{
  79. connectionID: connectionID,
  80. localTempDir: localTempDir,
  81. mountPath: getMountPath(mountPath),
  82. config: &s3Config,
  83. ctxTimeout: 30 * time.Second,
  84. }
  85. if err := fs.config.validate(); err != nil {
  86. return fs, err
  87. }
  88. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  89. defer cancel()
  90. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(getAWSHTTPClient(0, 30*time.Second)))
  91. if err != nil {
  92. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  93. }
  94. if fs.config.Region != "" {
  95. awsConfig.Region = fs.config.Region
  96. }
  97. if !fs.config.AccessSecret.IsEmpty() {
  98. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  99. return fs, err
  100. }
  101. awsConfig.Credentials = aws.NewCredentialsCache(
  102. credentials.NewStaticCredentialsProvider(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), ""))
  103. }
  104. if fs.config.Endpoint != "" {
  105. endpointResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...any) (aws.Endpoint, error) {
  106. return aws.Endpoint{
  107. URL: fs.config.Endpoint,
  108. HostnameImmutable: fs.config.ForcePathStyle,
  109. PartitionID: "aws",
  110. SigningRegion: fs.config.Region,
  111. Source: aws.EndpointSourceCustom,
  112. }, nil
  113. })
  114. awsConfig.EndpointResolverWithOptions = endpointResolver
  115. }
  116. fs.setConfigDefaults()
  117. if fs.config.RoleARN != "" {
  118. client := sts.NewFromConfig(awsConfig)
  119. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  120. awsConfig.Credentials = creds
  121. }
  122. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  123. o.UsePathStyle = fs.config.ForcePathStyle
  124. })
  125. return fs, nil
  126. }
  127. // Name returns the name for the Fs implementation
  128. func (fs *S3Fs) Name() string {
  129. return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket)
  130. }
  131. // ConnectionID returns the connection ID associated to this Fs implementation
  132. func (fs *S3Fs) ConnectionID() string {
  133. return fs.connectionID
  134. }
  135. // Stat returns a FileInfo describing the named file
  136. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  137. var result *FileInfo
  138. if name == "" || name == "/" || name == "." {
  139. err := fs.checkIfBucketExists()
  140. if err != nil {
  141. return result, err
  142. }
  143. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
  144. }
  145. if fs.config.KeyPrefix == name+"/" {
  146. return NewFileInfo(name, true, 0, time.Now(), false), nil
  147. }
  148. obj, err := fs.headObject(name)
  149. if err == nil {
  150. // a "dir" has a trailing "/" so we cannot have a directory here
  151. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, false, obj.ContentLength,
  152. util.GetTimeFromPointer(obj.LastModified), false))
  153. }
  154. if !fs.IsNotExist(err) {
  155. return result, err
  156. }
  157. // now check if this is a prefix (virtual directory)
  158. hasContents, err := fs.hasContents(name)
  159. if err == nil && hasContents {
  160. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
  161. } else if err != nil {
  162. return nil, err
  163. }
  164. // the requested file may still be a directory as a zero bytes key
  165. // with a trailing forward slash (created using mkdir).
  166. // S3 doesn't return content type when listing objects, so we have
  167. // create "dirs" adding a trailing "/" to the key
  168. return fs.getStatForDir(name)
  169. }
  170. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  171. var result *FileInfo
  172. obj, err := fs.headObject(name + "/")
  173. if err != nil {
  174. return result, err
  175. }
  176. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, obj.ContentLength,
  177. util.GetTimeFromPointer(obj.LastModified), false))
  178. }
  179. // Lstat returns a FileInfo describing the named file
  180. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  181. return fs.Stat(name)
  182. }
  183. // Open opens the named file for reading
  184. func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  185. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  186. if err != nil {
  187. return nil, nil, nil, err
  188. }
  189. ctx, cancelFn := context.WithCancel(context.Background())
  190. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  191. d.Concurrency = fs.config.DownloadConcurrency
  192. d.PartSize = fs.config.DownloadPartSize
  193. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  194. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  195. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond)
  196. })
  197. }
  198. })
  199. var streamRange *string
  200. if offset > 0 {
  201. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  202. }
  203. go func() {
  204. defer cancelFn()
  205. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  206. Bucket: aws.String(fs.config.Bucket),
  207. Key: aws.String(name),
  208. Range: streamRange,
  209. })
  210. w.CloseWithError(err) //nolint:errcheck
  211. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %+v", name, n, err)
  212. metric.S3TransferCompleted(n, 1, err)
  213. }()
  214. return nil, r, cancelFn, nil
  215. }
  216. // Create creates or opens the named file for writing
  217. func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  218. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  219. if err != nil {
  220. return nil, nil, nil, err
  221. }
  222. p := NewPipeWriter(w)
  223. ctx, cancelFn := context.WithCancel(context.Background())
  224. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  225. u.Concurrency = fs.config.UploadConcurrency
  226. u.PartSize = fs.config.UploadPartSize
  227. if fs.config.UploadPartMaxTime > 0 {
  228. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  229. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond)
  230. })
  231. }
  232. })
  233. go func() {
  234. defer cancelFn()
  235. var contentType string
  236. if flag == -1 {
  237. contentType = s3DirMimeType
  238. } else {
  239. contentType = mime.TypeByExtension(path.Ext(name))
  240. }
  241. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  242. Bucket: aws.String(fs.config.Bucket),
  243. Key: aws.String(name),
  244. Body: r,
  245. ACL: types.ObjectCannedACL(fs.config.ACL),
  246. StorageClass: types.StorageClass(fs.config.StorageClass),
  247. ContentType: util.NilIfEmpty(contentType),
  248. })
  249. r.CloseWithError(err) //nolint:errcheck
  250. p.Done(err)
  251. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, acl: %#v, readed bytes: %v, err: %+v",
  252. name, fs.config.ACL, r.GetReadedBytes(), err)
  253. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  254. }()
  255. return nil, p, cancelFn, nil
  256. }
  257. // Rename renames (moves) source to target.
  258. // We don't support renaming non empty directories since we should
  259. // rename all the contents too and this could take long time: think
  260. // about directories with thousands of files, for each file we should
  261. // execute a CopyObject call.
  262. func (fs *S3Fs) Rename(source, target string) error {
  263. if source == target {
  264. return nil
  265. }
  266. fi, err := fs.Stat(source)
  267. if err != nil {
  268. return err
  269. }
  270. copySource := fs.Join(fs.config.Bucket, source)
  271. if fi.IsDir() {
  272. hasContents, err := fs.hasContents(source)
  273. if err != nil {
  274. return err
  275. }
  276. if hasContents {
  277. return fmt.Errorf("cannot rename non empty directory: %#v", source)
  278. }
  279. if !strings.HasSuffix(copySource, "/") {
  280. copySource += "/"
  281. }
  282. if !strings.HasSuffix(target, "/") {
  283. target += "/"
  284. }
  285. }
  286. var contentType string
  287. if fi.IsDir() {
  288. contentType = s3DirMimeType
  289. } else {
  290. contentType = mime.TypeByExtension(path.Ext(source))
  291. }
  292. copySource = pathEscape(copySource)
  293. if fi.Size() > 500*1024*1024 {
  294. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  295. source, fi.Size())
  296. err = fs.doMultipartCopy(copySource, target, contentType, fi.Size())
  297. } else {
  298. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  299. defer cancelFn()
  300. _, err = fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  301. Bucket: aws.String(fs.config.Bucket),
  302. CopySource: aws.String(copySource),
  303. Key: aws.String(target),
  304. StorageClass: types.StorageClass(fs.config.StorageClass),
  305. ACL: types.ObjectCannedACL(fs.config.ACL),
  306. ContentType: util.NilIfEmpty(contentType),
  307. })
  308. }
  309. if err != nil {
  310. metric.S3CopyObjectCompleted(err)
  311. return err
  312. }
  313. waiter := s3.NewObjectExistsWaiter(fs.svc)
  314. err = waiter.Wait(context.Background(), &s3.HeadObjectInput{
  315. Bucket: aws.String(fs.config.Bucket),
  316. Key: aws.String(target),
  317. }, 10*time.Second)
  318. metric.S3CopyObjectCompleted(err)
  319. if err != nil {
  320. return err
  321. }
  322. if plugin.Handler.HasMetadater() {
  323. if !fi.IsDir() {
  324. err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
  325. util.GetTimeAsMsSinceEpoch(fi.ModTime()))
  326. if err != nil {
  327. fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %+v",
  328. source, target, err)
  329. }
  330. }
  331. }
  332. return fs.Remove(source, fi.IsDir())
  333. }
  334. // Remove removes the named file or (empty) directory.
  335. func (fs *S3Fs) Remove(name string, isDir bool) error {
  336. if isDir {
  337. hasContents, err := fs.hasContents(name)
  338. if err != nil {
  339. return err
  340. }
  341. if hasContents {
  342. return fmt.Errorf("cannot remove non empty directory: %#v", name)
  343. }
  344. if !strings.HasSuffix(name, "/") {
  345. name += "/"
  346. }
  347. }
  348. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  349. defer cancelFn()
  350. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  351. Bucket: aws.String(fs.config.Bucket),
  352. Key: aws.String(name),
  353. })
  354. metric.S3DeleteObjectCompleted(err)
  355. if plugin.Handler.HasMetadater() && err == nil && !isDir {
  356. if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
  357. fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %#v: %+v", name, errMetadata)
  358. }
  359. }
  360. return err
  361. }
  362. // Mkdir creates a new directory with the specified name and default permissions
  363. func (fs *S3Fs) Mkdir(name string) error {
  364. _, err := fs.Stat(name)
  365. if !fs.IsNotExist(err) {
  366. return err
  367. }
  368. if !strings.HasSuffix(name, "/") {
  369. name += "/"
  370. }
  371. _, w, _, err := fs.Create(name, -1)
  372. if err != nil {
  373. return err
  374. }
  375. return w.Close()
  376. }
  377. // Symlink creates source as a symbolic link to target.
  378. func (*S3Fs) Symlink(source, target string) error {
  379. return ErrVfsUnsupported
  380. }
  381. // Readlink returns the destination of the named symbolic link
  382. func (*S3Fs) Readlink(name string) (string, error) {
  383. return "", ErrVfsUnsupported
  384. }
  385. // Chown changes the numeric uid and gid of the named file.
  386. func (*S3Fs) Chown(name string, uid int, gid int) error {
  387. return ErrVfsUnsupported
  388. }
  389. // Chmod changes the mode of the named file to mode.
  390. func (*S3Fs) Chmod(name string, mode os.FileMode) error {
  391. return ErrVfsUnsupported
  392. }
  393. // Chtimes changes the access and modification times of the named file.
  394. func (fs *S3Fs) Chtimes(name string, atime, mtime time.Time, isUploading bool) error {
  395. if !plugin.Handler.HasMetadater() {
  396. return ErrVfsUnsupported
  397. }
  398. if !isUploading {
  399. info, err := fs.Stat(name)
  400. if err != nil {
  401. return err
  402. }
  403. if info.IsDir() {
  404. return ErrVfsUnsupported
  405. }
  406. }
  407. return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
  408. util.GetTimeAsMsSinceEpoch(mtime))
  409. }
  410. // Truncate changes the size of the named file.
  411. // Truncate by path is not supported, while truncating an opened
  412. // file is handled inside base transfer
  413. func (*S3Fs) Truncate(name string, size int64) error {
  414. return ErrVfsUnsupported
  415. }
  416. // ReadDir reads the directory named by dirname and returns
  417. // a list of directory entries.
  418. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  419. var result []os.FileInfo
  420. // dirname must be already cleaned
  421. prefix := fs.getPrefix(dirname)
  422. modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
  423. if err != nil {
  424. return result, err
  425. }
  426. prefixes := make(map[string]bool)
  427. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  428. Bucket: aws.String(fs.config.Bucket),
  429. Prefix: aws.String(prefix),
  430. Delimiter: aws.String("/"),
  431. })
  432. for paginator.HasMorePages() {
  433. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  434. defer cancelFn()
  435. page, err := paginator.NextPage(ctx)
  436. if err != nil {
  437. metric.S3ListObjectsCompleted(err)
  438. return result, err
  439. }
  440. for _, p := range page.CommonPrefixes {
  441. // prefixes have a trailing slash
  442. name, _ := fs.resolve(p.Prefix, prefix)
  443. if name == "" {
  444. continue
  445. }
  446. if _, ok := prefixes[name]; ok {
  447. continue
  448. }
  449. result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
  450. prefixes[name] = true
  451. }
  452. for _, fileObject := range page.Contents {
  453. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  454. name, isDir := fs.resolve(fileObject.Key, prefix)
  455. if name == "" || name == "/" {
  456. continue
  457. }
  458. if isDir {
  459. if _, ok := prefixes[name]; ok {
  460. continue
  461. }
  462. prefixes[name] = true
  463. }
  464. if t, ok := modTimes[name]; ok {
  465. objectModTime = util.GetTimeFromMsecSinceEpoch(t)
  466. }
  467. result = append(result, NewFileInfo(name, (isDir && fileObject.Size == 0), fileObject.Size,
  468. objectModTime, false))
  469. }
  470. }
  471. metric.S3ListObjectsCompleted(nil)
  472. return result, nil
  473. }
  474. // IsUploadResumeSupported returns true if resuming uploads is supported.
  475. // Resuming uploads is not supported on S3
  476. func (*S3Fs) IsUploadResumeSupported() bool {
  477. return false
  478. }
  479. // IsAtomicUploadSupported returns true if atomic upload is supported.
  480. // S3 uploads are already atomic, we don't need to upload to a temporary
  481. // file
  482. func (*S3Fs) IsAtomicUploadSupported() bool {
  483. return false
  484. }
  485. // IsNotExist returns a boolean indicating whether the error is known to
  486. // report that a file or directory does not exist
  487. func (*S3Fs) IsNotExist(err error) bool {
  488. if err == nil {
  489. return false
  490. }
  491. var re *awshttp.ResponseError
  492. if errors.As(err, &re) {
  493. if re.Response != nil {
  494. return re.Response.StatusCode == http.StatusNotFound
  495. }
  496. }
  497. return false
  498. }
  499. // IsPermission returns a boolean indicating whether the error is known to
  500. // report that permission is denied.
  501. func (*S3Fs) IsPermission(err error) bool {
  502. if err == nil {
  503. return false
  504. }
  505. var re *awshttp.ResponseError
  506. if errors.As(err, &re) {
  507. if re.Response != nil {
  508. return re.Response.StatusCode == http.StatusForbidden ||
  509. re.Response.StatusCode == http.StatusUnauthorized
  510. }
  511. }
  512. return false
  513. }
  514. // IsNotSupported returns true if the error indicate an unsupported operation
  515. func (*S3Fs) IsNotSupported(err error) bool {
  516. if err == nil {
  517. return false
  518. }
  519. return err == ErrVfsUnsupported
  520. }
  521. // CheckRootPath creates the specified local root directory if it does not exists
  522. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  523. // we need a local directory for temporary files
  524. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
  525. return osFs.CheckRootPath(username, uid, gid)
  526. }
  527. // ScanRootDirContents returns the number of files contained in the bucket,
  528. // and their size
  529. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  530. numFiles := 0
  531. size := int64(0)
  532. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  533. Bucket: aws.String(fs.config.Bucket),
  534. Prefix: aws.String(fs.config.KeyPrefix),
  535. })
  536. for paginator.HasMorePages() {
  537. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  538. defer cancelFn()
  539. page, err := paginator.NextPage(ctx)
  540. if err != nil {
  541. metric.S3ListObjectsCompleted(err)
  542. return numFiles, size, err
  543. }
  544. for _, fileObject := range page.Contents {
  545. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  546. if isDir && fileObject.Size == 0 {
  547. continue
  548. }
  549. numFiles++
  550. size += fileObject.Size
  551. }
  552. }
  553. metric.S3ListObjectsCompleted(nil)
  554. return numFiles, size, nil
  555. }
  556. func (fs *S3Fs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
  557. fileNames := make(map[string]bool)
  558. prefix := ""
  559. if fsPrefix != "/" {
  560. prefix = strings.TrimPrefix(fsPrefix, "/")
  561. }
  562. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  563. Bucket: aws.String(fs.config.Bucket),
  564. Prefix: aws.String(prefix),
  565. Delimiter: aws.String("/"),
  566. })
  567. for paginator.HasMorePages() {
  568. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  569. defer cancelFn()
  570. page, err := paginator.NextPage(ctx)
  571. if err != nil {
  572. metric.S3ListObjectsCompleted(err)
  573. if err != nil {
  574. fsLog(fs, logger.LevelError, "unable to get content for prefix %#v: %+v", prefix, err)
  575. return nil, err
  576. }
  577. return fileNames, err
  578. }
  579. for _, fileObject := range page.Contents {
  580. name, isDir := fs.resolve(fileObject.Key, prefix)
  581. if name != "" && !isDir {
  582. fileNames[name] = true
  583. }
  584. }
  585. }
  586. metric.S3ListObjectsCompleted(nil)
  587. return fileNames, nil
  588. }
  589. // CheckMetadata checks the metadata consistency
  590. func (fs *S3Fs) CheckMetadata() error {
  591. return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
  592. }
  593. // GetDirSize returns the number of files and the size for a folder
  594. // including any subfolders
  595. func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
  596. return 0, 0, ErrVfsUnsupported
  597. }
  598. // GetAtomicUploadPath returns the path to use for an atomic upload.
  599. // S3 uploads are already atomic, we never call this method for S3
  600. func (*S3Fs) GetAtomicUploadPath(name string) string {
  601. return ""
  602. }
  603. // GetRelativePath returns the path for a file relative to the user's home dir.
  604. // This is the path as seen by SFTPGo users
  605. func (fs *S3Fs) GetRelativePath(name string) string {
  606. rel := path.Clean(name)
  607. if rel == "." {
  608. rel = ""
  609. }
  610. if !path.IsAbs(rel) {
  611. rel = "/" + rel
  612. }
  613. if fs.config.KeyPrefix != "" {
  614. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  615. rel = "/"
  616. }
  617. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  618. }
  619. if fs.mountPath != "" {
  620. rel = path.Join(fs.mountPath, rel)
  621. }
  622. return rel
  623. }
  624. // Walk walks the file tree rooted at root, calling walkFn for each file or
  625. // directory in the tree, including root. The result are unordered
  626. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  627. prefix := fs.getPrefix(root)
  628. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  629. Bucket: aws.String(fs.config.Bucket),
  630. Prefix: aws.String(prefix),
  631. })
  632. for paginator.HasMorePages() {
  633. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  634. defer cancelFn()
  635. page, err := paginator.NextPage(ctx)
  636. if err != nil {
  637. metric.S3ListObjectsCompleted(err)
  638. walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck
  639. return err
  640. }
  641. for _, fileObject := range page.Contents {
  642. name, isDir := fs.resolve(fileObject.Key, prefix)
  643. if name == "" {
  644. continue
  645. }
  646. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  647. NewFileInfo(name, isDir, fileObject.Size, util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  648. if err != nil {
  649. return err
  650. }
  651. }
  652. }
  653. metric.S3ListObjectsCompleted(nil)
  654. walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), nil) //nolint:errcheck
  655. return nil
  656. }
  657. // Join joins any number of path elements into a single path
  658. func (*S3Fs) Join(elem ...string) string {
  659. return strings.TrimPrefix(path.Join(elem...), "/")
  660. }
  661. // HasVirtualFolders returns true if folders are emulated
  662. func (*S3Fs) HasVirtualFolders() bool {
  663. return true
  664. }
  665. // ResolvePath returns the matching filesystem path for the specified virtual path
  666. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  667. if fs.mountPath != "" {
  668. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  669. }
  670. if !path.IsAbs(virtualPath) {
  671. virtualPath = path.Clean("/" + virtualPath)
  672. }
  673. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  674. }
  675. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  676. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  677. isDir := strings.HasSuffix(result, "/")
  678. if isDir {
  679. result = strings.TrimSuffix(result, "/")
  680. }
  681. return result, isDir
  682. }
  683. func (fs *S3Fs) checkIfBucketExists() error {
  684. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  685. defer cancelFn()
  686. _, err := fs.svc.HeadBucket(ctx, &s3.HeadBucketInput{
  687. Bucket: aws.String(fs.config.Bucket),
  688. })
  689. metric.S3HeadBucketCompleted(err)
  690. return err
  691. }
  692. func (fs *S3Fs) setConfigDefaults() {
  693. if fs.config.UploadPartSize == 0 {
  694. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  695. } else {
  696. if fs.config.UploadPartSize < 1024*1024 {
  697. fs.config.UploadPartSize *= 1024 * 1024
  698. }
  699. }
  700. if fs.config.UploadConcurrency == 0 {
  701. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  702. }
  703. if fs.config.DownloadPartSize == 0 {
  704. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  705. } else {
  706. if fs.config.DownloadPartSize < 1024*1024 {
  707. fs.config.DownloadPartSize *= 1024 * 1024
  708. }
  709. }
  710. if fs.config.DownloadConcurrency == 0 {
  711. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  712. }
  713. }
  714. func (fs *S3Fs) hasContents(name string) (bool, error) {
  715. prefix := fs.getPrefix(name)
  716. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  717. Bucket: aws.String(fs.config.Bucket),
  718. Prefix: aws.String(prefix),
  719. MaxKeys: 2,
  720. })
  721. if paginator.HasMorePages() {
  722. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  723. defer cancelFn()
  724. page, err := paginator.NextPage(ctx)
  725. metric.S3ListObjectsCompleted(err)
  726. if err != nil {
  727. return false, err
  728. }
  729. for _, obj := range page.Contents {
  730. name, _ := fs.resolve(obj.Key, prefix)
  731. if name == "" || name == "/" {
  732. continue
  733. }
  734. return true, nil
  735. }
  736. return false, nil
  737. }
  738. metric.S3ListObjectsCompleted(nil)
  739. return false, nil
  740. }
  741. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  742. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  743. defer cancelFn()
  744. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  745. Bucket: aws.String(fs.config.Bucket),
  746. Key: aws.String(target),
  747. StorageClass: types.StorageClass(fs.config.StorageClass),
  748. ACL: types.ObjectCannedACL(fs.config.ACL),
  749. ContentType: util.NilIfEmpty(contentType),
  750. })
  751. if err != nil {
  752. return fmt.Errorf("unable to create multipart copy request: %w", err)
  753. }
  754. uploadID := util.GetStringFromPointer(res.UploadId)
  755. if uploadID == "" {
  756. return errors.New("unable to get multipart copy upload ID")
  757. }
  758. // We use 32 MB part size and copy 10 parts in parallel.
  759. // These values are arbitrary. We don't want to start too many goroutines
  760. maxPartSize := int64(32 * 1024 * 1024)
  761. if fileSize > int64(100*1024*1024*1024) {
  762. maxPartSize = int64(500 * 1024 * 1024)
  763. }
  764. guard := make(chan struct{}, 10)
  765. finished := false
  766. var completedParts []types.CompletedPart
  767. var partMutex sync.Mutex
  768. var wg sync.WaitGroup
  769. var hasError int32
  770. var errOnce sync.Once
  771. var copyError error
  772. var partNumber int32
  773. var offset int64
  774. opCtx, opCancel := context.WithCancel(context.Background())
  775. defer opCancel()
  776. for partNumber = 1; !finished; partNumber++ {
  777. start := offset
  778. end := offset + maxPartSize
  779. if end >= fileSize {
  780. end = fileSize
  781. finished = true
  782. }
  783. offset = end
  784. guard <- struct{}{}
  785. if atomic.LoadInt32(&hasError) == 1 {
  786. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  787. break
  788. }
  789. wg.Add(1)
  790. go func(partNum int32, partStart, partEnd int64) {
  791. defer func() {
  792. <-guard
  793. wg.Done()
  794. }()
  795. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  796. defer innerCancelFn()
  797. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  798. Bucket: aws.String(fs.config.Bucket),
  799. CopySource: aws.String(source),
  800. Key: aws.String(target),
  801. PartNumber: partNum,
  802. UploadId: aws.String(uploadID),
  803. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  804. })
  805. if err != nil {
  806. errOnce.Do(func() {
  807. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  808. atomic.StoreInt32(&hasError, 1)
  809. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  810. opCancel()
  811. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  812. defer abortCancelFn()
  813. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  814. Bucket: aws.String(fs.config.Bucket),
  815. Key: aws.String(target),
  816. UploadId: aws.String(uploadID),
  817. })
  818. if errAbort != nil {
  819. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  820. }
  821. })
  822. return
  823. }
  824. partMutex.Lock()
  825. completedParts = append(completedParts, types.CompletedPart{
  826. ETag: partResp.CopyPartResult.ETag,
  827. PartNumber: partNum,
  828. })
  829. partMutex.Unlock()
  830. }(partNumber, start, end)
  831. }
  832. wg.Wait()
  833. close(guard)
  834. if copyError != nil {
  835. return copyError
  836. }
  837. sort.Slice(completedParts, func(i, j int) bool {
  838. return completedParts[i].PartNumber < completedParts[j].PartNumber
  839. })
  840. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  841. defer completeCancelFn()
  842. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  843. Bucket: aws.String(fs.config.Bucket),
  844. Key: aws.String(target),
  845. UploadId: aws.String(uploadID),
  846. MultipartUpload: &types.CompletedMultipartUpload{
  847. Parts: completedParts,
  848. },
  849. })
  850. if err != nil {
  851. return fmt.Errorf("unable to complete multipart upload: %w", err)
  852. }
  853. return nil
  854. }
  855. func (fs *S3Fs) getPrefix(name string) string {
  856. prefix := ""
  857. if name != "" && name != "." && name != "/" {
  858. prefix = strings.TrimPrefix(name, "/")
  859. if !strings.HasSuffix(prefix, "/") {
  860. prefix += "/"
  861. }
  862. }
  863. return prefix
  864. }
  865. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  866. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  867. defer cancelFn()
  868. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  869. Bucket: aws.String(fs.config.Bucket),
  870. Key: aws.String(name),
  871. })
  872. metric.S3HeadObjectCompleted(err)
  873. return obj, err
  874. }
  875. // GetMimeType returns the content type
  876. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  877. obj, err := fs.headObject(name)
  878. if err != nil {
  879. return "", err
  880. }
  881. return util.GetStringFromPointer(obj.ContentType), nil
  882. }
  883. // Close closes the fs
  884. func (*S3Fs) Close() error {
  885. return nil
  886. }
  887. // GetAvailableDiskSize returns the available size for the specified path
  888. func (*S3Fs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  889. return nil, ErrStorageSizeUnavailable
  890. }
  891. func (fs *S3Fs) getStorageID() string {
  892. if fs.config.Endpoint != "" {
  893. if !strings.HasSuffix(fs.config.Endpoint, "/") {
  894. return fmt.Sprintf("s3://%v/%v", fs.config.Endpoint, fs.config.Bucket)
  895. }
  896. return fmt.Sprintf("s3://%v%v", fs.config.Endpoint, fs.config.Bucket)
  897. }
  898. return fmt.Sprintf("s3://%v", fs.config.Bucket)
  899. }
  900. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration) *awshttp.BuildableClient {
  901. c := awshttp.NewBuildableClient().
  902. WithDialerOptions(func(d *net.Dialer) {
  903. d.Timeout = 8 * time.Second
  904. }).
  905. WithTransportOptions(func(tr *http.Transport) {
  906. tr.IdleConnTimeout = idleConnectionTimeout
  907. tr.WriteBufferSize = s3TransferBufferSize
  908. tr.ReadBufferSize = s3TransferBufferSize
  909. })
  910. if timeout > 0 {
  911. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  912. }
  913. return c
  914. }
  915. // ideally we should simply use url.PathEscape:
  916. //
  917. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  918. //
  919. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  920. func pathEscape(in string) string {
  921. var u url.URL
  922. u.Path = in
  923. return strings.ReplaceAll(u.String(), "+", "%2B")
  924. }