s3fs.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "crypto/tls"
  20. "errors"
  21. "fmt"
  22. "mime"
  23. "net"
  24. "net/http"
  25. "net/url"
  26. "os"
  27. "path"
  28. "path/filepath"
  29. "sort"
  30. "strings"
  31. "sync"
  32. "sync/atomic"
  33. "time"
  34. "github.com/aws/aws-sdk-go-v2/aws"
  35. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  36. "github.com/aws/aws-sdk-go-v2/config"
  37. "github.com/aws/aws-sdk-go-v2/credentials"
  38. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  39. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  40. "github.com/aws/aws-sdk-go-v2/service/s3"
  41. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  42. "github.com/aws/aws-sdk-go-v2/service/sts"
  43. "github.com/eikenb/pipeat"
  44. "github.com/pkg/sftp"
  45. "github.com/drakkan/sftpgo/v2/internal/logger"
  46. "github.com/drakkan/sftpgo/v2/internal/metric"
  47. "github.com/drakkan/sftpgo/v2/internal/plugin"
  48. "github.com/drakkan/sftpgo/v2/internal/util"
  49. "github.com/drakkan/sftpgo/v2/internal/version"
  50. )
  51. const (
  52. // using this mime type for directories improves compatibility with s3fs-fuse
  53. s3DirMimeType = "application/x-directory"
  54. s3TransferBufferSize = 256 * 1024
  55. )
  56. var (
  57. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  58. )
  59. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  60. type S3Fs struct {
  61. connectionID string
  62. localTempDir string
  63. // if not empty this fs is mouted as virtual folder in the specified path
  64. mountPath string
  65. config *S3FsConfig
  66. svc *s3.Client
  67. ctxTimeout time.Duration
  68. }
  69. func init() {
  70. version.AddFeature("+s3")
  71. }
  72. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  73. // object storage
  74. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  75. if localTempDir == "" {
  76. localTempDir = getLocalTempDir()
  77. }
  78. fs := &S3Fs{
  79. connectionID: connectionID,
  80. localTempDir: localTempDir,
  81. mountPath: getMountPath(mountPath),
  82. config: &s3Config,
  83. ctxTimeout: 30 * time.Second,
  84. }
  85. if err := fs.config.validate(); err != nil {
  86. return fs, err
  87. }
  88. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  89. defer cancel()
  90. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(
  91. getAWSHTTPClient(0, 30*time.Second, fs.config.SkipTLSVerify)),
  92. )
  93. if err != nil {
  94. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  95. }
  96. if fs.config.Region != "" {
  97. awsConfig.Region = fs.config.Region
  98. }
  99. if !fs.config.AccessSecret.IsEmpty() {
  100. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  101. return fs, err
  102. }
  103. awsConfig.Credentials = aws.NewCredentialsCache(
  104. credentials.NewStaticCredentialsProvider(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), ""))
  105. }
  106. fs.setConfigDefaults()
  107. if fs.config.RoleARN != "" {
  108. client := sts.NewFromConfig(awsConfig)
  109. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  110. awsConfig.Credentials = creds
  111. }
  112. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  113. o.AppID = fmt.Sprintf("SFTPGo-%s", version.Get().CommitHash)
  114. o.UsePathStyle = fs.config.ForcePathStyle
  115. if fs.config.Endpoint != "" {
  116. o.BaseEndpoint = aws.String(fs.config.Endpoint)
  117. }
  118. })
  119. return fs, nil
  120. }
  121. // Name returns the name for the Fs implementation
  122. func (fs *S3Fs) Name() string {
  123. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  124. }
  125. // ConnectionID returns the connection ID associated to this Fs implementation
  126. func (fs *S3Fs) ConnectionID() string {
  127. return fs.connectionID
  128. }
  129. // Stat returns a FileInfo describing the named file
  130. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  131. var result *FileInfo
  132. if name == "" || name == "/" || name == "." {
  133. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  134. }
  135. if fs.config.KeyPrefix == name+"/" {
  136. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  137. }
  138. obj, err := fs.headObject(name)
  139. if err == nil {
  140. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  141. // So we check some common content types to detect if this is a "directory".
  142. isDir := util.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  143. if util.GetIntFromPointer(obj.ContentLength) == 0 && !isDir {
  144. _, err = fs.headObject(name + "/")
  145. isDir = err == nil
  146. }
  147. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir, util.GetIntFromPointer(obj.ContentLength),
  148. util.GetTimeFromPointer(obj.LastModified), false))
  149. }
  150. if !fs.IsNotExist(err) {
  151. return result, err
  152. }
  153. // now check if this is a prefix (virtual directory)
  154. hasContents, err := fs.hasContents(name)
  155. if err == nil && hasContents {
  156. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  157. } else if err != nil {
  158. return nil, err
  159. }
  160. // the requested file may still be a directory as a zero bytes key
  161. // with a trailing forward slash (created using mkdir).
  162. // S3 doesn't return content type when listing objects, so we have
  163. // create "dirs" adding a trailing "/" to the key
  164. return fs.getStatForDir(name)
  165. }
  166. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  167. var result *FileInfo
  168. obj, err := fs.headObject(name + "/")
  169. if err != nil {
  170. return result, err
  171. }
  172. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, util.GetIntFromPointer(obj.ContentLength),
  173. util.GetTimeFromPointer(obj.LastModified), false))
  174. }
  175. // Lstat returns a FileInfo describing the named file
  176. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  177. return fs.Stat(name)
  178. }
  179. // Open opens the named file for reading
  180. func (fs *S3Fs) Open(name string, offset int64) (File, *PipeReader, func(), error) {
  181. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  182. if err != nil {
  183. return nil, nil, nil, err
  184. }
  185. p := NewPipeReader(r)
  186. if readMetadata > 0 {
  187. attrs, err := fs.headObject(name)
  188. if err != nil {
  189. r.Close()
  190. w.Close()
  191. return nil, nil, nil, err
  192. }
  193. p.setMetadata(attrs.Metadata)
  194. }
  195. ctx, cancelFn := context.WithCancel(context.Background())
  196. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  197. d.Concurrency = fs.config.DownloadConcurrency
  198. d.PartSize = fs.config.DownloadPartSize
  199. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  200. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  201. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  202. fs.config.SkipTLSVerify)
  203. })
  204. }
  205. })
  206. var streamRange *string
  207. if offset > 0 {
  208. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  209. }
  210. go func() {
  211. defer cancelFn()
  212. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  213. Bucket: aws.String(fs.config.Bucket),
  214. Key: aws.String(name),
  215. Range: streamRange,
  216. })
  217. w.CloseWithError(err) //nolint:errcheck
  218. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  219. metric.S3TransferCompleted(n, 1, err)
  220. }()
  221. return nil, p, cancelFn, nil
  222. }
  223. // Create creates or opens the named file for writing
  224. func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
  225. if checks&CheckParentDir != 0 {
  226. _, err := fs.Stat(path.Dir(name))
  227. if err != nil {
  228. return nil, nil, nil, err
  229. }
  230. }
  231. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  232. if err != nil {
  233. return nil, nil, nil, err
  234. }
  235. var p PipeWriter
  236. if checks&CheckResume != 0 {
  237. p = newPipeWriterAtOffset(w, 0)
  238. } else {
  239. p = NewPipeWriter(w)
  240. }
  241. ctx, cancelFn := context.WithCancel(context.Background())
  242. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  243. u.Concurrency = fs.config.UploadConcurrency
  244. u.PartSize = fs.config.UploadPartSize
  245. if fs.config.UploadPartMaxTime > 0 {
  246. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  247. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond,
  248. fs.config.SkipTLSVerify)
  249. })
  250. }
  251. })
  252. go func() {
  253. defer cancelFn()
  254. var contentType string
  255. if flag == -1 {
  256. contentType = s3DirMimeType
  257. } else {
  258. contentType = mime.TypeByExtension(path.Ext(name))
  259. }
  260. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  261. Bucket: aws.String(fs.config.Bucket),
  262. Key: aws.String(name),
  263. Body: r,
  264. ACL: types.ObjectCannedACL(fs.config.ACL),
  265. StorageClass: types.StorageClass(fs.config.StorageClass),
  266. ContentType: util.NilIfEmpty(contentType),
  267. })
  268. r.CloseWithError(err) //nolint:errcheck
  269. p.Done(err)
  270. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v",
  271. name, fs.config.ACL, r.GetReadedBytes(), err)
  272. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  273. }()
  274. if checks&CheckResume != 0 {
  275. readCh := make(chan error, 1)
  276. go func() {
  277. n, err := fs.downloadToWriter(name, p)
  278. pw := p.(*pipeWriterAtOffset)
  279. pw.offset = 0
  280. pw.writeOffset = n
  281. readCh <- err
  282. }()
  283. err = <-readCh
  284. if err != nil {
  285. cancelFn()
  286. p.Close()
  287. fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
  288. return nil, nil, nil, err
  289. }
  290. }
  291. if uploadMode&4 != 0 {
  292. return nil, p, nil, nil
  293. }
  294. return nil, p, cancelFn, nil
  295. }
  296. // Rename renames (moves) source to target.
  297. func (fs *S3Fs) Rename(source, target string) (int, int64, error) {
  298. if source == target {
  299. return -1, -1, nil
  300. }
  301. _, err := fs.Stat(path.Dir(target))
  302. if err != nil {
  303. return -1, -1, err
  304. }
  305. fi, err := fs.Stat(source)
  306. if err != nil {
  307. return -1, -1, err
  308. }
  309. return fs.renameInternal(source, target, fi)
  310. }
  311. // Remove removes the named file or (empty) directory.
  312. func (fs *S3Fs) Remove(name string, isDir bool) error {
  313. if isDir {
  314. hasContents, err := fs.hasContents(name)
  315. if err != nil {
  316. return err
  317. }
  318. if hasContents {
  319. return fmt.Errorf("cannot remove non empty directory: %q", name)
  320. }
  321. if !strings.HasSuffix(name, "/") {
  322. name += "/"
  323. }
  324. }
  325. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  326. defer cancelFn()
  327. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  328. Bucket: aws.String(fs.config.Bucket),
  329. Key: aws.String(name),
  330. })
  331. metric.S3DeleteObjectCompleted(err)
  332. if plugin.Handler.HasMetadater() && err == nil && !isDir {
  333. if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
  334. fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %q: %+v", name, errMetadata)
  335. }
  336. }
  337. return err
  338. }
  339. // Mkdir creates a new directory with the specified name and default permissions
  340. func (fs *S3Fs) Mkdir(name string) error {
  341. _, err := fs.Stat(name)
  342. if !fs.IsNotExist(err) {
  343. return err
  344. }
  345. return fs.mkdirInternal(name)
  346. }
  347. // Symlink creates source as a symbolic link to target.
  348. func (*S3Fs) Symlink(_, _ string) error {
  349. return ErrVfsUnsupported
  350. }
  351. // Readlink returns the destination of the named symbolic link
  352. func (*S3Fs) Readlink(_ string) (string, error) {
  353. return "", ErrVfsUnsupported
  354. }
  355. // Chown changes the numeric uid and gid of the named file.
  356. func (*S3Fs) Chown(_ string, _ int, _ int) error {
  357. return ErrVfsUnsupported
  358. }
  359. // Chmod changes the mode of the named file to mode.
  360. func (*S3Fs) Chmod(_ string, _ os.FileMode) error {
  361. return ErrVfsUnsupported
  362. }
  363. // Chtimes changes the access and modification times of the named file.
  364. func (fs *S3Fs) Chtimes(name string, _, mtime time.Time, isUploading bool) error {
  365. if !plugin.Handler.HasMetadater() {
  366. return ErrVfsUnsupported
  367. }
  368. if !isUploading {
  369. info, err := fs.Stat(name)
  370. if err != nil {
  371. return err
  372. }
  373. if info.IsDir() {
  374. return ErrVfsUnsupported
  375. }
  376. }
  377. return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
  378. util.GetTimeAsMsSinceEpoch(mtime))
  379. }
  380. // Truncate changes the size of the named file.
  381. // Truncate by path is not supported, while truncating an opened
  382. // file is handled inside base transfer
  383. func (*S3Fs) Truncate(_ string, _ int64) error {
  384. return ErrVfsUnsupported
  385. }
  386. // ReadDir reads the directory named by dirname and returns
  387. // a list of directory entries.
  388. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  389. var result []os.FileInfo
  390. // dirname must be already cleaned
  391. prefix := fs.getPrefix(dirname)
  392. modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
  393. if err != nil {
  394. return result, err
  395. }
  396. prefixes := make(map[string]bool)
  397. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  398. Bucket: aws.String(fs.config.Bucket),
  399. Prefix: aws.String(prefix),
  400. Delimiter: aws.String("/"),
  401. })
  402. for paginator.HasMorePages() {
  403. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  404. defer cancelFn()
  405. page, err := paginator.NextPage(ctx)
  406. if err != nil {
  407. metric.S3ListObjectsCompleted(err)
  408. return result, err
  409. }
  410. for _, p := range page.CommonPrefixes {
  411. // prefixes have a trailing slash
  412. name, _ := fs.resolve(p.Prefix, prefix)
  413. if name == "" {
  414. continue
  415. }
  416. if _, ok := prefixes[name]; ok {
  417. continue
  418. }
  419. result = append(result, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  420. prefixes[name] = true
  421. }
  422. for _, fileObject := range page.Contents {
  423. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  424. objectSize := util.GetIntFromPointer(fileObject.Size)
  425. name, isDir := fs.resolve(fileObject.Key, prefix)
  426. if name == "" || name == "/" {
  427. continue
  428. }
  429. if isDir {
  430. if _, ok := prefixes[name]; ok {
  431. continue
  432. }
  433. prefixes[name] = true
  434. }
  435. if t, ok := modTimes[name]; ok {
  436. objectModTime = util.GetTimeFromMsecSinceEpoch(t)
  437. }
  438. result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize,
  439. objectModTime, false))
  440. }
  441. }
  442. metric.S3ListObjectsCompleted(nil)
  443. return result, nil
  444. }
  445. // IsUploadResumeSupported returns true if resuming uploads is supported.
  446. // Resuming uploads is not supported on S3
  447. func (*S3Fs) IsUploadResumeSupported() bool {
  448. return false
  449. }
  450. // IsConditionalUploadResumeSupported returns if resuming uploads is supported
  451. // for the specified size
  452. func (*S3Fs) IsConditionalUploadResumeSupported(size int64) bool {
  453. return size <= resumeMaxSize
  454. }
  455. // IsAtomicUploadSupported returns true if atomic upload is supported.
  456. // S3 uploads are already atomic, we don't need to upload to a temporary
  457. // file
  458. func (*S3Fs) IsAtomicUploadSupported() bool {
  459. return false
  460. }
  461. // IsNotExist returns a boolean indicating whether the error is known to
  462. // report that a file or directory does not exist
  463. func (*S3Fs) IsNotExist(err error) bool {
  464. if err == nil {
  465. return false
  466. }
  467. var re *awshttp.ResponseError
  468. if errors.As(err, &re) {
  469. if re.Response != nil {
  470. return re.Response.StatusCode == http.StatusNotFound
  471. }
  472. }
  473. return false
  474. }
  475. // IsPermission returns a boolean indicating whether the error is known to
  476. // report that permission is denied.
  477. func (*S3Fs) IsPermission(err error) bool {
  478. if err == nil {
  479. return false
  480. }
  481. var re *awshttp.ResponseError
  482. if errors.As(err, &re) {
  483. if re.Response != nil {
  484. return re.Response.StatusCode == http.StatusForbidden ||
  485. re.Response.StatusCode == http.StatusUnauthorized
  486. }
  487. }
  488. return false
  489. }
  490. // IsNotSupported returns true if the error indicate an unsupported operation
  491. func (*S3Fs) IsNotSupported(err error) bool {
  492. if err == nil {
  493. return false
  494. }
  495. return err == ErrVfsUnsupported
  496. }
  497. // CheckRootPath creates the specified local root directory if it does not exists
  498. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  499. // we need a local directory for temporary files
  500. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  501. return osFs.CheckRootPath(username, uid, gid)
  502. }
  503. // ScanRootDirContents returns the number of files contained in the bucket,
  504. // and their size
  505. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  506. return fs.GetDirSize(fs.config.KeyPrefix)
  507. }
  508. func (fs *S3Fs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
  509. fileNames := make(map[string]bool)
  510. prefix := ""
  511. if fsPrefix != "/" {
  512. prefix = strings.TrimPrefix(fsPrefix, "/")
  513. }
  514. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  515. Bucket: aws.String(fs.config.Bucket),
  516. Prefix: aws.String(prefix),
  517. Delimiter: aws.String("/"),
  518. })
  519. for paginator.HasMorePages() {
  520. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  521. defer cancelFn()
  522. page, err := paginator.NextPage(ctx)
  523. if err != nil {
  524. metric.S3ListObjectsCompleted(err)
  525. if err != nil {
  526. fsLog(fs, logger.LevelError, "unable to get content for prefix %q: %+v", prefix, err)
  527. return nil, err
  528. }
  529. return fileNames, err
  530. }
  531. for _, fileObject := range page.Contents {
  532. name, isDir := fs.resolve(fileObject.Key, prefix)
  533. if name != "" && !isDir {
  534. fileNames[name] = true
  535. }
  536. }
  537. }
  538. metric.S3ListObjectsCompleted(nil)
  539. return fileNames, nil
  540. }
  541. // CheckMetadata checks the metadata consistency
  542. func (fs *S3Fs) CheckMetadata() error {
  543. return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
  544. }
  545. // GetDirSize returns the number of files and the size for a folder
  546. // including any subfolders
  547. func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
  548. prefix := fs.getPrefix(dirname)
  549. numFiles := 0
  550. size := int64(0)
  551. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  552. Bucket: aws.String(fs.config.Bucket),
  553. Prefix: aws.String(prefix),
  554. })
  555. for paginator.HasMorePages() {
  556. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  557. defer cancelFn()
  558. page, err := paginator.NextPage(ctx)
  559. if err != nil {
  560. metric.S3ListObjectsCompleted(err)
  561. return numFiles, size, err
  562. }
  563. for _, fileObject := range page.Contents {
  564. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  565. objectSize := util.GetIntFromPointer(fileObject.Size)
  566. if isDir && objectSize == 0 {
  567. continue
  568. }
  569. numFiles++
  570. size += objectSize
  571. if numFiles%1000 == 0 {
  572. fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size)
  573. }
  574. }
  575. }
  576. metric.S3ListObjectsCompleted(nil)
  577. return numFiles, size, nil
  578. }
  579. // GetAtomicUploadPath returns the path to use for an atomic upload.
  580. // S3 uploads are already atomic, we never call this method for S3
  581. func (*S3Fs) GetAtomicUploadPath(_ string) string {
  582. return ""
  583. }
  584. // GetRelativePath returns the path for a file relative to the user's home dir.
  585. // This is the path as seen by SFTPGo users
  586. func (fs *S3Fs) GetRelativePath(name string) string {
  587. rel := path.Clean(name)
  588. if rel == "." {
  589. rel = ""
  590. }
  591. if !path.IsAbs(rel) {
  592. rel = "/" + rel
  593. }
  594. if fs.config.KeyPrefix != "" {
  595. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  596. rel = "/"
  597. }
  598. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  599. }
  600. if fs.mountPath != "" {
  601. rel = path.Join(fs.mountPath, rel)
  602. }
  603. return rel
  604. }
  605. // Walk walks the file tree rooted at root, calling walkFn for each file or
  606. // directory in the tree, including root. The result are unordered
  607. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  608. prefix := fs.getPrefix(root)
  609. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  610. Bucket: aws.String(fs.config.Bucket),
  611. Prefix: aws.String(prefix),
  612. })
  613. for paginator.HasMorePages() {
  614. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  615. defer cancelFn()
  616. page, err := paginator.NextPage(ctx)
  617. if err != nil {
  618. metric.S3ListObjectsCompleted(err)
  619. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  620. return err
  621. }
  622. for _, fileObject := range page.Contents {
  623. name, isDir := fs.resolve(fileObject.Key, prefix)
  624. if name == "" {
  625. continue
  626. }
  627. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  628. NewFileInfo(name, isDir, util.GetIntFromPointer(fileObject.Size), util.GetTimeFromPointer(fileObject.LastModified),
  629. false), nil)
  630. if err != nil {
  631. return err
  632. }
  633. }
  634. }
  635. metric.S3ListObjectsCompleted(nil)
  636. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  637. return nil
  638. }
  639. // Join joins any number of path elements into a single path
  640. func (*S3Fs) Join(elem ...string) string {
  641. return strings.TrimPrefix(path.Join(elem...), "/")
  642. }
  643. // HasVirtualFolders returns true if folders are emulated
  644. func (*S3Fs) HasVirtualFolders() bool {
  645. return true
  646. }
  647. // ResolvePath returns the matching filesystem path for the specified virtual path
  648. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  649. if fs.mountPath != "" {
  650. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  651. }
  652. if !path.IsAbs(virtualPath) {
  653. virtualPath = path.Clean("/" + virtualPath)
  654. }
  655. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  656. }
  657. // CopyFile implements the FsFileCopier interface
  658. func (fs *S3Fs) CopyFile(source, target string, srcSize int64) error {
  659. return fs.copyFileInternal(source, target, srcSize)
  660. }
  661. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  662. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  663. isDir := strings.HasSuffix(result, "/")
  664. if isDir {
  665. result = strings.TrimSuffix(result, "/")
  666. }
  667. return result, isDir
  668. }
  669. func (fs *S3Fs) setConfigDefaults() {
  670. if fs.config.UploadPartSize == 0 {
  671. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  672. } else {
  673. if fs.config.UploadPartSize < 1024*1024 {
  674. fs.config.UploadPartSize *= 1024 * 1024
  675. }
  676. }
  677. if fs.config.UploadConcurrency == 0 {
  678. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  679. }
  680. if fs.config.DownloadPartSize == 0 {
  681. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  682. } else {
  683. if fs.config.DownloadPartSize < 1024*1024 {
  684. fs.config.DownloadPartSize *= 1024 * 1024
  685. }
  686. }
  687. if fs.config.DownloadConcurrency == 0 {
  688. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  689. }
  690. }
  691. func (fs *S3Fs) copyFileInternal(source, target string, fileSize int64) error {
  692. contentType := mime.TypeByExtension(path.Ext(source))
  693. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  694. if fileSize > 500*1024*1024 {
  695. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  696. source, fileSize)
  697. err := fs.doMultipartCopy(copySource, target, contentType, fileSize)
  698. metric.S3CopyObjectCompleted(err)
  699. return err
  700. }
  701. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  702. defer cancelFn()
  703. _, err := fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  704. Bucket: aws.String(fs.config.Bucket),
  705. CopySource: aws.String(copySource),
  706. Key: aws.String(target),
  707. StorageClass: types.StorageClass(fs.config.StorageClass),
  708. ACL: types.ObjectCannedACL(fs.config.ACL),
  709. ContentType: util.NilIfEmpty(contentType),
  710. })
  711. metric.S3CopyObjectCompleted(err)
  712. return err
  713. }
  714. func (fs *S3Fs) renameInternal(source, target string, fi os.FileInfo) (int, int64, error) {
  715. var numFiles int
  716. var filesSize int64
  717. if fi.IsDir() {
  718. if renameMode == 0 {
  719. hasContents, err := fs.hasContents(source)
  720. if err != nil {
  721. return numFiles, filesSize, err
  722. }
  723. if hasContents {
  724. return numFiles, filesSize, fmt.Errorf("cannot rename non empty directory: %q", source)
  725. }
  726. }
  727. if err := fs.mkdirInternal(target); err != nil {
  728. return numFiles, filesSize, err
  729. }
  730. if renameMode == 1 {
  731. entries, err := fs.ReadDir(source)
  732. if err != nil {
  733. return numFiles, filesSize, err
  734. }
  735. for _, info := range entries {
  736. sourceEntry := fs.Join(source, info.Name())
  737. targetEntry := fs.Join(target, info.Name())
  738. files, size, err := fs.renameInternal(sourceEntry, targetEntry, info)
  739. if err != nil {
  740. if fs.IsNotExist(err) {
  741. fsLog(fs, logger.LevelInfo, "skipping rename for %q: %v", sourceEntry, err)
  742. continue
  743. }
  744. return numFiles, filesSize, err
  745. }
  746. numFiles += files
  747. filesSize += size
  748. }
  749. }
  750. } else {
  751. if err := fs.copyFileInternal(source, target, fi.Size()); err != nil {
  752. return numFiles, filesSize, err
  753. }
  754. numFiles++
  755. filesSize += fi.Size()
  756. if plugin.Handler.HasMetadater() {
  757. err := plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
  758. util.GetTimeAsMsSinceEpoch(fi.ModTime()))
  759. if err != nil {
  760. fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %q -> %q: %+v",
  761. source, target, err)
  762. }
  763. }
  764. }
  765. err := fs.Remove(source, fi.IsDir())
  766. if fs.IsNotExist(err) {
  767. err = nil
  768. }
  769. return numFiles, filesSize, err
  770. }
  771. func (fs *S3Fs) mkdirInternal(name string) error {
  772. if !strings.HasSuffix(name, "/") {
  773. name += "/"
  774. }
  775. _, w, _, err := fs.Create(name, -1, 0)
  776. if err != nil {
  777. return err
  778. }
  779. return w.Close()
  780. }
  781. func (fs *S3Fs) hasContents(name string) (bool, error) {
  782. prefix := fs.getPrefix(name)
  783. maxKeys := int32(2)
  784. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  785. Bucket: aws.String(fs.config.Bucket),
  786. Prefix: aws.String(prefix),
  787. MaxKeys: &maxKeys,
  788. })
  789. if paginator.HasMorePages() {
  790. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  791. defer cancelFn()
  792. page, err := paginator.NextPage(ctx)
  793. metric.S3ListObjectsCompleted(err)
  794. if err != nil {
  795. return false, err
  796. }
  797. for _, obj := range page.Contents {
  798. name, _ := fs.resolve(obj.Key, prefix)
  799. if name == "" || name == "/" {
  800. continue
  801. }
  802. return true, nil
  803. }
  804. return false, nil
  805. }
  806. metric.S3ListObjectsCompleted(nil)
  807. return false, nil
  808. }
  809. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  810. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  811. defer cancelFn()
  812. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  813. Bucket: aws.String(fs.config.Bucket),
  814. Key: aws.String(target),
  815. StorageClass: types.StorageClass(fs.config.StorageClass),
  816. ACL: types.ObjectCannedACL(fs.config.ACL),
  817. ContentType: util.NilIfEmpty(contentType),
  818. })
  819. if err != nil {
  820. return fmt.Errorf("unable to create multipart copy request: %w", err)
  821. }
  822. uploadID := util.GetStringFromPointer(res.UploadId)
  823. if uploadID == "" {
  824. return errors.New("unable to get multipart copy upload ID")
  825. }
  826. // We use 32 MB part size and copy 10 parts in parallel.
  827. // These values are arbitrary. We don't want to start too many goroutines
  828. maxPartSize := int64(32 * 1024 * 1024)
  829. if fileSize > int64(100*1024*1024*1024) {
  830. maxPartSize = int64(500 * 1024 * 1024)
  831. }
  832. guard := make(chan struct{}, 10)
  833. finished := false
  834. var completedParts []types.CompletedPart
  835. var partMutex sync.Mutex
  836. var wg sync.WaitGroup
  837. var hasError atomic.Bool
  838. var errOnce sync.Once
  839. var copyError error
  840. var partNumber int32
  841. var offset int64
  842. opCtx, opCancel := context.WithCancel(context.Background())
  843. defer opCancel()
  844. for partNumber = 1; !finished; partNumber++ {
  845. start := offset
  846. end := offset + maxPartSize
  847. if end >= fileSize {
  848. end = fileSize
  849. finished = true
  850. }
  851. offset = end
  852. guard <- struct{}{}
  853. if hasError.Load() {
  854. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  855. break
  856. }
  857. wg.Add(1)
  858. go func(partNum int32, partStart, partEnd int64) {
  859. defer func() {
  860. <-guard
  861. wg.Done()
  862. }()
  863. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  864. defer innerCancelFn()
  865. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  866. Bucket: aws.String(fs.config.Bucket),
  867. CopySource: aws.String(source),
  868. Key: aws.String(target),
  869. PartNumber: &partNum,
  870. UploadId: aws.String(uploadID),
  871. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  872. })
  873. if err != nil {
  874. errOnce.Do(func() {
  875. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  876. hasError.Store(true)
  877. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  878. opCancel()
  879. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  880. defer abortCancelFn()
  881. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  882. Bucket: aws.String(fs.config.Bucket),
  883. Key: aws.String(target),
  884. UploadId: aws.String(uploadID),
  885. })
  886. if errAbort != nil {
  887. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  888. }
  889. })
  890. return
  891. }
  892. partMutex.Lock()
  893. completedParts = append(completedParts, types.CompletedPart{
  894. ETag: partResp.CopyPartResult.ETag,
  895. PartNumber: &partNum,
  896. })
  897. partMutex.Unlock()
  898. }(partNumber, start, end)
  899. }
  900. wg.Wait()
  901. close(guard)
  902. if copyError != nil {
  903. return copyError
  904. }
  905. sort.Slice(completedParts, func(i, j int) bool {
  906. getPartNumber := func(number *int32) int32 {
  907. if number == nil {
  908. return 0
  909. }
  910. return *number
  911. }
  912. return getPartNumber(completedParts[i].PartNumber) < getPartNumber(completedParts[j].PartNumber)
  913. })
  914. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  915. defer completeCancelFn()
  916. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  917. Bucket: aws.String(fs.config.Bucket),
  918. Key: aws.String(target),
  919. UploadId: aws.String(uploadID),
  920. MultipartUpload: &types.CompletedMultipartUpload{
  921. Parts: completedParts,
  922. },
  923. })
  924. if err != nil {
  925. return fmt.Errorf("unable to complete multipart upload: %w", err)
  926. }
  927. return nil
  928. }
  929. func (fs *S3Fs) getPrefix(name string) string {
  930. prefix := ""
  931. if name != "" && name != "." && name != "/" {
  932. prefix = strings.TrimPrefix(name, "/")
  933. if !strings.HasSuffix(prefix, "/") {
  934. prefix += "/"
  935. }
  936. }
  937. return prefix
  938. }
  939. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  940. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  941. defer cancelFn()
  942. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  943. Bucket: aws.String(fs.config.Bucket),
  944. Key: aws.String(name),
  945. })
  946. metric.S3HeadObjectCompleted(err)
  947. return obj, err
  948. }
  949. // GetMimeType returns the content type
  950. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  951. obj, err := fs.headObject(name)
  952. if err != nil {
  953. return "", err
  954. }
  955. return util.GetStringFromPointer(obj.ContentType), nil
  956. }
  957. // Close closes the fs
  958. func (*S3Fs) Close() error {
  959. return nil
  960. }
  961. // GetAvailableDiskSize returns the available size for the specified path
  962. func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  963. return nil, ErrStorageSizeUnavailable
  964. }
  965. func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) (int64, error) {
  966. fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
  967. ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
  968. defer cancelFn()
  969. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  970. d.Concurrency = fs.config.DownloadConcurrency
  971. d.PartSize = fs.config.DownloadPartSize
  972. if fs.config.DownloadPartMaxTime > 0 {
  973. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  974. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  975. fs.config.SkipTLSVerify)
  976. })
  977. }
  978. })
  979. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  980. Bucket: aws.String(fs.config.Bucket),
  981. Key: aws.String(name),
  982. })
  983. fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
  984. name, n, err)
  985. metric.S3TransferCompleted(n, 1, err)
  986. return n, err
  987. }
  988. func (fs *S3Fs) getStorageID() string {
  989. if fs.config.Endpoint != "" {
  990. if !strings.HasSuffix(fs.config.Endpoint, "/") {
  991. return fmt.Sprintf("s3://%v/%v", fs.config.Endpoint, fs.config.Bucket)
  992. }
  993. return fmt.Sprintf("s3://%v%v", fs.config.Endpoint, fs.config.Bucket)
  994. }
  995. return fmt.Sprintf("s3://%v", fs.config.Bucket)
  996. }
  997. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration, skipTLSVerify bool) *awshttp.BuildableClient {
  998. c := awshttp.NewBuildableClient().
  999. WithDialerOptions(func(d *net.Dialer) {
  1000. d.Timeout = 8 * time.Second
  1001. }).
  1002. WithTransportOptions(func(tr *http.Transport) {
  1003. tr.IdleConnTimeout = idleConnectionTimeout
  1004. tr.WriteBufferSize = s3TransferBufferSize
  1005. tr.ReadBufferSize = s3TransferBufferSize
  1006. if skipTLSVerify {
  1007. if tr.TLSClientConfig != nil {
  1008. tr.TLSClientConfig.InsecureSkipVerify = skipTLSVerify
  1009. } else {
  1010. tr.TLSClientConfig = &tls.Config{
  1011. MinVersion: awshttp.DefaultHTTPTransportTLSMinVersion,
  1012. InsecureSkipVerify: skipTLSVerify,
  1013. }
  1014. }
  1015. }
  1016. })
  1017. if timeout > 0 {
  1018. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  1019. }
  1020. return c
  1021. }
  1022. // ideally we should simply use url.PathEscape:
  1023. //
  1024. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  1025. //
  1026. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  1027. func pathEscape(in string) string {
  1028. var u url.URL
  1029. u.Path = in
  1030. return strings.ReplaceAll(u.String(), "+", "%2B")
  1031. }