s3fs.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "mime"
  22. "net"
  23. "net/http"
  24. "net/url"
  25. "os"
  26. "path"
  27. "path/filepath"
  28. "sort"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/aws/aws-sdk-go-v2/aws"
  34. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  35. "github.com/aws/aws-sdk-go-v2/config"
  36. "github.com/aws/aws-sdk-go-v2/credentials"
  37. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  38. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  39. "github.com/aws/aws-sdk-go-v2/service/s3"
  40. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  41. "github.com/aws/aws-sdk-go-v2/service/sts"
  42. "github.com/eikenb/pipeat"
  43. "github.com/pkg/sftp"
  44. "github.com/drakkan/sftpgo/v2/internal/logger"
  45. "github.com/drakkan/sftpgo/v2/internal/metric"
  46. "github.com/drakkan/sftpgo/v2/internal/plugin"
  47. "github.com/drakkan/sftpgo/v2/internal/util"
  48. "github.com/drakkan/sftpgo/v2/internal/version"
  49. )
  50. const (
  51. // using this mime type for directories improves compatibility with s3fs-fuse
  52. s3DirMimeType = "application/x-directory"
  53. s3TransferBufferSize = 256 * 1024
  54. )
  55. var (
  56. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  57. )
  58. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  59. type S3Fs struct {
  60. connectionID string
  61. localTempDir string
  62. // if not empty this fs is mouted as virtual folder in the specified path
  63. mountPath string
  64. config *S3FsConfig
  65. svc *s3.Client
  66. ctxTimeout time.Duration
  67. }
  68. func init() {
  69. version.AddFeature("+s3")
  70. }
  71. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  72. // object storage
  73. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  74. if localTempDir == "" {
  75. localTempDir = getLocalTempDir()
  76. }
  77. fs := &S3Fs{
  78. connectionID: connectionID,
  79. localTempDir: localTempDir,
  80. mountPath: getMountPath(mountPath),
  81. config: &s3Config,
  82. ctxTimeout: 30 * time.Second,
  83. }
  84. if err := fs.config.validate(); err != nil {
  85. return fs, err
  86. }
  87. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  88. defer cancel()
  89. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(getAWSHTTPClient(0, 30*time.Second)))
  90. if err != nil {
  91. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  92. }
  93. if fs.config.Region != "" {
  94. awsConfig.Region = fs.config.Region
  95. }
  96. if !fs.config.AccessSecret.IsEmpty() {
  97. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  98. return fs, err
  99. }
  100. awsConfig.Credentials = aws.NewCredentialsCache(
  101. credentials.NewStaticCredentialsProvider(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), ""))
  102. }
  103. fs.setConfigDefaults()
  104. if fs.config.RoleARN != "" {
  105. client := sts.NewFromConfig(awsConfig)
  106. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  107. awsConfig.Credentials = creds
  108. }
  109. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  110. o.AppID = fmt.Sprintf("SFTPGo-%s", version.Get().CommitHash)
  111. o.UsePathStyle = fs.config.ForcePathStyle
  112. if fs.config.Endpoint != "" {
  113. o.BaseEndpoint = aws.String(fs.config.Endpoint)
  114. }
  115. })
  116. return fs, nil
  117. }
  118. // Name returns the name for the Fs implementation
  119. func (fs *S3Fs) Name() string {
  120. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  121. }
  122. // ConnectionID returns the connection ID associated to this Fs implementation
  123. func (fs *S3Fs) ConnectionID() string {
  124. return fs.connectionID
  125. }
  126. // Stat returns a FileInfo describing the named file
  127. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  128. var result *FileInfo
  129. if name == "" || name == "/" || name == "." {
  130. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  131. }
  132. if fs.config.KeyPrefix == name+"/" {
  133. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  134. }
  135. obj, err := fs.headObject(name)
  136. if err == nil {
  137. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  138. // So we check some common content types to detect if this is a "directory".
  139. isDir := util.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  140. if obj.ContentLength == 0 && !isDir {
  141. _, err = fs.headObject(name + "/")
  142. isDir = err == nil
  143. }
  144. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir, obj.ContentLength,
  145. util.GetTimeFromPointer(obj.LastModified), false))
  146. }
  147. if !fs.IsNotExist(err) {
  148. return result, err
  149. }
  150. // now check if this is a prefix (virtual directory)
  151. hasContents, err := fs.hasContents(name)
  152. if err == nil && hasContents {
  153. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  154. } else if err != nil {
  155. return nil, err
  156. }
  157. // the requested file may still be a directory as a zero bytes key
  158. // with a trailing forward slash (created using mkdir).
  159. // S3 doesn't return content type when listing objects, so we have
  160. // create "dirs" adding a trailing "/" to the key
  161. return fs.getStatForDir(name)
  162. }
  163. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  164. var result *FileInfo
  165. obj, err := fs.headObject(name + "/")
  166. if err != nil {
  167. return result, err
  168. }
  169. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, obj.ContentLength,
  170. util.GetTimeFromPointer(obj.LastModified), false))
  171. }
  172. // Lstat returns a FileInfo describing the named file
  173. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  174. return fs.Stat(name)
  175. }
  176. // Open opens the named file for reading
  177. func (fs *S3Fs) Open(name string, offset int64) (File, *PipeReader, func(), error) {
  178. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  179. if err != nil {
  180. return nil, nil, nil, err
  181. }
  182. p := NewPipeReader(r)
  183. if readMetadata > 0 {
  184. attrs, err := fs.headObject(name)
  185. if err != nil {
  186. r.Close()
  187. w.Close()
  188. return nil, nil, nil, err
  189. }
  190. p.setMetadata(attrs.Metadata)
  191. }
  192. ctx, cancelFn := context.WithCancel(context.Background())
  193. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  194. d.Concurrency = fs.config.DownloadConcurrency
  195. d.PartSize = fs.config.DownloadPartSize
  196. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  197. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  198. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond)
  199. })
  200. }
  201. })
  202. var streamRange *string
  203. if offset > 0 {
  204. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  205. }
  206. go func() {
  207. defer cancelFn()
  208. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  209. Bucket: aws.String(fs.config.Bucket),
  210. Key: aws.String(name),
  211. Range: streamRange,
  212. })
  213. w.CloseWithError(err) //nolint:errcheck
  214. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  215. metric.S3TransferCompleted(n, 1, err)
  216. }()
  217. return nil, p, cancelFn, nil
  218. }
  219. // Create creates or opens the named file for writing
  220. func (fs *S3Fs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) {
  221. if checks&CheckParentDir != 0 {
  222. _, err := fs.Stat(path.Dir(name))
  223. if err != nil {
  224. return nil, nil, nil, err
  225. }
  226. }
  227. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  228. if err != nil {
  229. return nil, nil, nil, err
  230. }
  231. p := NewPipeWriter(w)
  232. ctx, cancelFn := context.WithCancel(context.Background())
  233. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  234. u.Concurrency = fs.config.UploadConcurrency
  235. u.PartSize = fs.config.UploadPartSize
  236. if fs.config.UploadPartMaxTime > 0 {
  237. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  238. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond)
  239. })
  240. }
  241. })
  242. go func() {
  243. defer cancelFn()
  244. var contentType string
  245. if flag == -1 {
  246. contentType = s3DirMimeType
  247. } else {
  248. contentType = mime.TypeByExtension(path.Ext(name))
  249. }
  250. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  251. Bucket: aws.String(fs.config.Bucket),
  252. Key: aws.String(name),
  253. Body: r,
  254. ACL: types.ObjectCannedACL(fs.config.ACL),
  255. StorageClass: types.StorageClass(fs.config.StorageClass),
  256. ContentType: util.NilIfEmpty(contentType),
  257. })
  258. r.CloseWithError(err) //nolint:errcheck
  259. p.Done(err)
  260. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %v, err: %+v",
  261. name, fs.config.ACL, r.GetReadedBytes(), err)
  262. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  263. }()
  264. return nil, p, cancelFn, nil
  265. }
  266. // Rename renames (moves) source to target.
  267. func (fs *S3Fs) Rename(source, target string) (int, int64, error) {
  268. if source == target {
  269. return -1, -1, nil
  270. }
  271. _, err := fs.Stat(path.Dir(target))
  272. if err != nil {
  273. return -1, -1, err
  274. }
  275. fi, err := fs.Stat(source)
  276. if err != nil {
  277. return -1, -1, err
  278. }
  279. return fs.renameInternal(source, target, fi)
  280. }
  281. // Remove removes the named file or (empty) directory.
  282. func (fs *S3Fs) Remove(name string, isDir bool) error {
  283. if isDir {
  284. hasContents, err := fs.hasContents(name)
  285. if err != nil {
  286. return err
  287. }
  288. if hasContents {
  289. return fmt.Errorf("cannot remove non empty directory: %q", name)
  290. }
  291. if !strings.HasSuffix(name, "/") {
  292. name += "/"
  293. }
  294. }
  295. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  296. defer cancelFn()
  297. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  298. Bucket: aws.String(fs.config.Bucket),
  299. Key: aws.String(name),
  300. })
  301. metric.S3DeleteObjectCompleted(err)
  302. if plugin.Handler.HasMetadater() && err == nil && !isDir {
  303. if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
  304. fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %q: %+v", name, errMetadata)
  305. }
  306. }
  307. return err
  308. }
  309. // Mkdir creates a new directory with the specified name and default permissions
  310. func (fs *S3Fs) Mkdir(name string) error {
  311. _, err := fs.Stat(name)
  312. if !fs.IsNotExist(err) {
  313. return err
  314. }
  315. return fs.mkdirInternal(name)
  316. }
  317. // Symlink creates source as a symbolic link to target.
  318. func (*S3Fs) Symlink(_, _ string) error {
  319. return ErrVfsUnsupported
  320. }
  321. // Readlink returns the destination of the named symbolic link
  322. func (*S3Fs) Readlink(_ string) (string, error) {
  323. return "", ErrVfsUnsupported
  324. }
  325. // Chown changes the numeric uid and gid of the named file.
  326. func (*S3Fs) Chown(_ string, _ int, _ int) error {
  327. return ErrVfsUnsupported
  328. }
  329. // Chmod changes the mode of the named file to mode.
  330. func (*S3Fs) Chmod(_ string, _ os.FileMode) error {
  331. return ErrVfsUnsupported
  332. }
  333. // Chtimes changes the access and modification times of the named file.
  334. func (fs *S3Fs) Chtimes(name string, _, mtime time.Time, isUploading bool) error {
  335. if !plugin.Handler.HasMetadater() {
  336. return ErrVfsUnsupported
  337. }
  338. if !isUploading {
  339. info, err := fs.Stat(name)
  340. if err != nil {
  341. return err
  342. }
  343. if info.IsDir() {
  344. return ErrVfsUnsupported
  345. }
  346. }
  347. return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
  348. util.GetTimeAsMsSinceEpoch(mtime))
  349. }
  350. // Truncate changes the size of the named file.
  351. // Truncate by path is not supported, while truncating an opened
  352. // file is handled inside base transfer
  353. func (*S3Fs) Truncate(_ string, _ int64) error {
  354. return ErrVfsUnsupported
  355. }
  356. // ReadDir reads the directory named by dirname and returns
  357. // a list of directory entries.
  358. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  359. var result []os.FileInfo
  360. // dirname must be already cleaned
  361. prefix := fs.getPrefix(dirname)
  362. modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
  363. if err != nil {
  364. return result, err
  365. }
  366. prefixes := make(map[string]bool)
  367. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  368. Bucket: aws.String(fs.config.Bucket),
  369. Prefix: aws.String(prefix),
  370. Delimiter: aws.String("/"),
  371. })
  372. for paginator.HasMorePages() {
  373. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  374. defer cancelFn()
  375. page, err := paginator.NextPage(ctx)
  376. if err != nil {
  377. metric.S3ListObjectsCompleted(err)
  378. return result, err
  379. }
  380. for _, p := range page.CommonPrefixes {
  381. // prefixes have a trailing slash
  382. name, _ := fs.resolve(p.Prefix, prefix)
  383. if name == "" {
  384. continue
  385. }
  386. if _, ok := prefixes[name]; ok {
  387. continue
  388. }
  389. result = append(result, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  390. prefixes[name] = true
  391. }
  392. for _, fileObject := range page.Contents {
  393. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  394. name, isDir := fs.resolve(fileObject.Key, prefix)
  395. if name == "" || name == "/" {
  396. continue
  397. }
  398. if isDir {
  399. if _, ok := prefixes[name]; ok {
  400. continue
  401. }
  402. prefixes[name] = true
  403. }
  404. if t, ok := modTimes[name]; ok {
  405. objectModTime = util.GetTimeFromMsecSinceEpoch(t)
  406. }
  407. result = append(result, NewFileInfo(name, (isDir && fileObject.Size == 0), fileObject.Size,
  408. objectModTime, false))
  409. }
  410. }
  411. metric.S3ListObjectsCompleted(nil)
  412. return result, nil
  413. }
  414. // IsUploadResumeSupported returns true if resuming uploads is supported.
  415. // Resuming uploads is not supported on S3
  416. func (*S3Fs) IsUploadResumeSupported() bool {
  417. return false
  418. }
  419. // IsAtomicUploadSupported returns true if atomic upload is supported.
  420. // S3 uploads are already atomic, we don't need to upload to a temporary
  421. // file
  422. func (*S3Fs) IsAtomicUploadSupported() bool {
  423. return false
  424. }
  425. // IsNotExist returns a boolean indicating whether the error is known to
  426. // report that a file or directory does not exist
  427. func (*S3Fs) IsNotExist(err error) bool {
  428. if err == nil {
  429. return false
  430. }
  431. var re *awshttp.ResponseError
  432. if errors.As(err, &re) {
  433. if re.Response != nil {
  434. return re.Response.StatusCode == http.StatusNotFound
  435. }
  436. }
  437. return false
  438. }
  439. // IsPermission returns a boolean indicating whether the error is known to
  440. // report that permission is denied.
  441. func (*S3Fs) IsPermission(err error) bool {
  442. if err == nil {
  443. return false
  444. }
  445. var re *awshttp.ResponseError
  446. if errors.As(err, &re) {
  447. if re.Response != nil {
  448. return re.Response.StatusCode == http.StatusForbidden ||
  449. re.Response.StatusCode == http.StatusUnauthorized
  450. }
  451. }
  452. return false
  453. }
  454. // IsNotSupported returns true if the error indicate an unsupported operation
  455. func (*S3Fs) IsNotSupported(err error) bool {
  456. if err == nil {
  457. return false
  458. }
  459. return err == ErrVfsUnsupported
  460. }
  461. // CheckRootPath creates the specified local root directory if it does not exists
  462. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  463. // we need a local directory for temporary files
  464. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  465. return osFs.CheckRootPath(username, uid, gid)
  466. }
  467. // ScanRootDirContents returns the number of files contained in the bucket,
  468. // and their size
  469. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  470. return fs.GetDirSize(fs.config.KeyPrefix)
  471. }
  472. func (fs *S3Fs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
  473. fileNames := make(map[string]bool)
  474. prefix := ""
  475. if fsPrefix != "/" {
  476. prefix = strings.TrimPrefix(fsPrefix, "/")
  477. }
  478. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  479. Bucket: aws.String(fs.config.Bucket),
  480. Prefix: aws.String(prefix),
  481. Delimiter: aws.String("/"),
  482. })
  483. for paginator.HasMorePages() {
  484. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  485. defer cancelFn()
  486. page, err := paginator.NextPage(ctx)
  487. if err != nil {
  488. metric.S3ListObjectsCompleted(err)
  489. if err != nil {
  490. fsLog(fs, logger.LevelError, "unable to get content for prefix %q: %+v", prefix, err)
  491. return nil, err
  492. }
  493. return fileNames, err
  494. }
  495. for _, fileObject := range page.Contents {
  496. name, isDir := fs.resolve(fileObject.Key, prefix)
  497. if name != "" && !isDir {
  498. fileNames[name] = true
  499. }
  500. }
  501. }
  502. metric.S3ListObjectsCompleted(nil)
  503. return fileNames, nil
  504. }
  505. // CheckMetadata checks the metadata consistency
  506. func (fs *S3Fs) CheckMetadata() error {
  507. return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
  508. }
  509. // GetDirSize returns the number of files and the size for a folder
  510. // including any subfolders
  511. func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
  512. prefix := fs.getPrefix(dirname)
  513. numFiles := 0
  514. size := int64(0)
  515. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  516. Bucket: aws.String(fs.config.Bucket),
  517. Prefix: aws.String(prefix),
  518. })
  519. for paginator.HasMorePages() {
  520. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  521. defer cancelFn()
  522. page, err := paginator.NextPage(ctx)
  523. if err != nil {
  524. metric.S3ListObjectsCompleted(err)
  525. return numFiles, size, err
  526. }
  527. for _, fileObject := range page.Contents {
  528. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  529. if isDir && fileObject.Size == 0 {
  530. continue
  531. }
  532. numFiles++
  533. size += fileObject.Size
  534. if numFiles%1000 == 0 {
  535. fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size)
  536. }
  537. }
  538. }
  539. metric.S3ListObjectsCompleted(nil)
  540. return numFiles, size, nil
  541. }
  542. // GetAtomicUploadPath returns the path to use for an atomic upload.
  543. // S3 uploads are already atomic, we never call this method for S3
  544. func (*S3Fs) GetAtomicUploadPath(_ string) string {
  545. return ""
  546. }
  547. // GetRelativePath returns the path for a file relative to the user's home dir.
  548. // This is the path as seen by SFTPGo users
  549. func (fs *S3Fs) GetRelativePath(name string) string {
  550. rel := path.Clean(name)
  551. if rel == "." {
  552. rel = ""
  553. }
  554. if !path.IsAbs(rel) {
  555. rel = "/" + rel
  556. }
  557. if fs.config.KeyPrefix != "" {
  558. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  559. rel = "/"
  560. }
  561. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  562. }
  563. if fs.mountPath != "" {
  564. rel = path.Join(fs.mountPath, rel)
  565. }
  566. return rel
  567. }
  568. // Walk walks the file tree rooted at root, calling walkFn for each file or
  569. // directory in the tree, including root. The result are unordered
  570. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  571. prefix := fs.getPrefix(root)
  572. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  573. Bucket: aws.String(fs.config.Bucket),
  574. Prefix: aws.String(prefix),
  575. })
  576. for paginator.HasMorePages() {
  577. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  578. defer cancelFn()
  579. page, err := paginator.NextPage(ctx)
  580. if err != nil {
  581. metric.S3ListObjectsCompleted(err)
  582. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  583. return err
  584. }
  585. for _, fileObject := range page.Contents {
  586. name, isDir := fs.resolve(fileObject.Key, prefix)
  587. if name == "" {
  588. continue
  589. }
  590. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  591. NewFileInfo(name, isDir, fileObject.Size, util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  592. if err != nil {
  593. return err
  594. }
  595. }
  596. }
  597. metric.S3ListObjectsCompleted(nil)
  598. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  599. return nil
  600. }
  601. // Join joins any number of path elements into a single path
  602. func (*S3Fs) Join(elem ...string) string {
  603. return strings.TrimPrefix(path.Join(elem...), "/")
  604. }
  605. // HasVirtualFolders returns true if folders are emulated
  606. func (*S3Fs) HasVirtualFolders() bool {
  607. return true
  608. }
  609. // ResolvePath returns the matching filesystem path for the specified virtual path
  610. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  611. if fs.mountPath != "" {
  612. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  613. }
  614. if !path.IsAbs(virtualPath) {
  615. virtualPath = path.Clean("/" + virtualPath)
  616. }
  617. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  618. }
  619. // CopyFile implements the FsFileCopier interface
  620. func (fs *S3Fs) CopyFile(source, target string, srcSize int64) error {
  621. return fs.copyFileInternal(source, target, srcSize)
  622. }
  623. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  624. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  625. isDir := strings.HasSuffix(result, "/")
  626. if isDir {
  627. result = strings.TrimSuffix(result, "/")
  628. }
  629. return result, isDir
  630. }
  631. func (fs *S3Fs) setConfigDefaults() {
  632. if fs.config.UploadPartSize == 0 {
  633. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  634. } else {
  635. if fs.config.UploadPartSize < 1024*1024 {
  636. fs.config.UploadPartSize *= 1024 * 1024
  637. }
  638. }
  639. if fs.config.UploadConcurrency == 0 {
  640. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  641. }
  642. if fs.config.DownloadPartSize == 0 {
  643. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  644. } else {
  645. if fs.config.DownloadPartSize < 1024*1024 {
  646. fs.config.DownloadPartSize *= 1024 * 1024
  647. }
  648. }
  649. if fs.config.DownloadConcurrency == 0 {
  650. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  651. }
  652. }
  653. func (fs *S3Fs) copyFileInternal(source, target string, fileSize int64) error {
  654. contentType := mime.TypeByExtension(path.Ext(source))
  655. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  656. if fileSize > 500*1024*1024 {
  657. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  658. source, fileSize)
  659. err := fs.doMultipartCopy(copySource, target, contentType, fileSize)
  660. metric.S3CopyObjectCompleted(err)
  661. return err
  662. }
  663. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  664. defer cancelFn()
  665. _, err := fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  666. Bucket: aws.String(fs.config.Bucket),
  667. CopySource: aws.String(copySource),
  668. Key: aws.String(target),
  669. StorageClass: types.StorageClass(fs.config.StorageClass),
  670. ACL: types.ObjectCannedACL(fs.config.ACL),
  671. ContentType: util.NilIfEmpty(contentType),
  672. })
  673. metric.S3CopyObjectCompleted(err)
  674. return err
  675. }
  676. func (fs *S3Fs) renameInternal(source, target string, fi os.FileInfo) (int, int64, error) {
  677. var numFiles int
  678. var filesSize int64
  679. if fi.IsDir() {
  680. if renameMode == 0 {
  681. hasContents, err := fs.hasContents(source)
  682. if err != nil {
  683. return numFiles, filesSize, err
  684. }
  685. if hasContents {
  686. return numFiles, filesSize, fmt.Errorf("cannot rename non empty directory: %q", source)
  687. }
  688. }
  689. if err := fs.mkdirInternal(target); err != nil {
  690. return numFiles, filesSize, err
  691. }
  692. if renameMode == 1 {
  693. entries, err := fs.ReadDir(source)
  694. if err != nil {
  695. return numFiles, filesSize, err
  696. }
  697. for _, info := range entries {
  698. sourceEntry := fs.Join(source, info.Name())
  699. targetEntry := fs.Join(target, info.Name())
  700. files, size, err := fs.renameInternal(sourceEntry, targetEntry, info)
  701. if err != nil {
  702. if fs.IsNotExist(err) {
  703. fsLog(fs, logger.LevelInfo, "skipping rename for %q: %v", sourceEntry, err)
  704. continue
  705. }
  706. return numFiles, filesSize, err
  707. }
  708. numFiles += files
  709. filesSize += size
  710. }
  711. }
  712. } else {
  713. if err := fs.copyFileInternal(source, target, fi.Size()); err != nil {
  714. return numFiles, filesSize, err
  715. }
  716. numFiles++
  717. filesSize += fi.Size()
  718. if plugin.Handler.HasMetadater() {
  719. err := plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
  720. util.GetTimeAsMsSinceEpoch(fi.ModTime()))
  721. if err != nil {
  722. fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %q -> %q: %+v",
  723. source, target, err)
  724. }
  725. }
  726. }
  727. err := fs.Remove(source, fi.IsDir())
  728. if fs.IsNotExist(err) {
  729. err = nil
  730. }
  731. return numFiles, filesSize, err
  732. }
  733. func (fs *S3Fs) mkdirInternal(name string) error {
  734. if !strings.HasSuffix(name, "/") {
  735. name += "/"
  736. }
  737. _, w, _, err := fs.Create(name, -1, 0)
  738. if err != nil {
  739. return err
  740. }
  741. return w.Close()
  742. }
  743. func (fs *S3Fs) hasContents(name string) (bool, error) {
  744. prefix := fs.getPrefix(name)
  745. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  746. Bucket: aws.String(fs.config.Bucket),
  747. Prefix: aws.String(prefix),
  748. MaxKeys: 2,
  749. })
  750. if paginator.HasMorePages() {
  751. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  752. defer cancelFn()
  753. page, err := paginator.NextPage(ctx)
  754. metric.S3ListObjectsCompleted(err)
  755. if err != nil {
  756. return false, err
  757. }
  758. for _, obj := range page.Contents {
  759. name, _ := fs.resolve(obj.Key, prefix)
  760. if name == "" || name == "/" {
  761. continue
  762. }
  763. return true, nil
  764. }
  765. return false, nil
  766. }
  767. metric.S3ListObjectsCompleted(nil)
  768. return false, nil
  769. }
  770. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  771. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  772. defer cancelFn()
  773. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  774. Bucket: aws.String(fs.config.Bucket),
  775. Key: aws.String(target),
  776. StorageClass: types.StorageClass(fs.config.StorageClass),
  777. ACL: types.ObjectCannedACL(fs.config.ACL),
  778. ContentType: util.NilIfEmpty(contentType),
  779. })
  780. if err != nil {
  781. return fmt.Errorf("unable to create multipart copy request: %w", err)
  782. }
  783. uploadID := util.GetStringFromPointer(res.UploadId)
  784. if uploadID == "" {
  785. return errors.New("unable to get multipart copy upload ID")
  786. }
  787. // We use 32 MB part size and copy 10 parts in parallel.
  788. // These values are arbitrary. We don't want to start too many goroutines
  789. maxPartSize := int64(32 * 1024 * 1024)
  790. if fileSize > int64(100*1024*1024*1024) {
  791. maxPartSize = int64(500 * 1024 * 1024)
  792. }
  793. guard := make(chan struct{}, 10)
  794. finished := false
  795. var completedParts []types.CompletedPart
  796. var partMutex sync.Mutex
  797. var wg sync.WaitGroup
  798. var hasError atomic.Bool
  799. var errOnce sync.Once
  800. var copyError error
  801. var partNumber int32
  802. var offset int64
  803. opCtx, opCancel := context.WithCancel(context.Background())
  804. defer opCancel()
  805. for partNumber = 1; !finished; partNumber++ {
  806. start := offset
  807. end := offset + maxPartSize
  808. if end >= fileSize {
  809. end = fileSize
  810. finished = true
  811. }
  812. offset = end
  813. guard <- struct{}{}
  814. if hasError.Load() {
  815. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  816. break
  817. }
  818. wg.Add(1)
  819. go func(partNum int32, partStart, partEnd int64) {
  820. defer func() {
  821. <-guard
  822. wg.Done()
  823. }()
  824. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  825. defer innerCancelFn()
  826. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  827. Bucket: aws.String(fs.config.Bucket),
  828. CopySource: aws.String(source),
  829. Key: aws.String(target),
  830. PartNumber: partNum,
  831. UploadId: aws.String(uploadID),
  832. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  833. })
  834. if err != nil {
  835. errOnce.Do(func() {
  836. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  837. hasError.Store(true)
  838. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  839. opCancel()
  840. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  841. defer abortCancelFn()
  842. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  843. Bucket: aws.String(fs.config.Bucket),
  844. Key: aws.String(target),
  845. UploadId: aws.String(uploadID),
  846. })
  847. if errAbort != nil {
  848. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  849. }
  850. })
  851. return
  852. }
  853. partMutex.Lock()
  854. completedParts = append(completedParts, types.CompletedPart{
  855. ETag: partResp.CopyPartResult.ETag,
  856. PartNumber: partNum,
  857. })
  858. partMutex.Unlock()
  859. }(partNumber, start, end)
  860. }
  861. wg.Wait()
  862. close(guard)
  863. if copyError != nil {
  864. return copyError
  865. }
  866. sort.Slice(completedParts, func(i, j int) bool {
  867. return completedParts[i].PartNumber < completedParts[j].PartNumber
  868. })
  869. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  870. defer completeCancelFn()
  871. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  872. Bucket: aws.String(fs.config.Bucket),
  873. Key: aws.String(target),
  874. UploadId: aws.String(uploadID),
  875. MultipartUpload: &types.CompletedMultipartUpload{
  876. Parts: completedParts,
  877. },
  878. })
  879. if err != nil {
  880. return fmt.Errorf("unable to complete multipart upload: %w", err)
  881. }
  882. return nil
  883. }
  884. func (fs *S3Fs) getPrefix(name string) string {
  885. prefix := ""
  886. if name != "" && name != "." && name != "/" {
  887. prefix = strings.TrimPrefix(name, "/")
  888. if !strings.HasSuffix(prefix, "/") {
  889. prefix += "/"
  890. }
  891. }
  892. return prefix
  893. }
  894. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  895. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  896. defer cancelFn()
  897. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  898. Bucket: aws.String(fs.config.Bucket),
  899. Key: aws.String(name),
  900. })
  901. metric.S3HeadObjectCompleted(err)
  902. return obj, err
  903. }
  904. // GetMimeType returns the content type
  905. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  906. obj, err := fs.headObject(name)
  907. if err != nil {
  908. return "", err
  909. }
  910. return util.GetStringFromPointer(obj.ContentType), nil
  911. }
  912. // Close closes the fs
  913. func (*S3Fs) Close() error {
  914. return nil
  915. }
  916. // GetAvailableDiskSize returns the available size for the specified path
  917. func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  918. return nil, ErrStorageSizeUnavailable
  919. }
  920. func (fs *S3Fs) getStorageID() string {
  921. if fs.config.Endpoint != "" {
  922. if !strings.HasSuffix(fs.config.Endpoint, "/") {
  923. return fmt.Sprintf("s3://%v/%v", fs.config.Endpoint, fs.config.Bucket)
  924. }
  925. return fmt.Sprintf("s3://%v%v", fs.config.Endpoint, fs.config.Bucket)
  926. }
  927. return fmt.Sprintf("s3://%v", fs.config.Bucket)
  928. }
  929. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration) *awshttp.BuildableClient {
  930. c := awshttp.NewBuildableClient().
  931. WithDialerOptions(func(d *net.Dialer) {
  932. d.Timeout = 8 * time.Second
  933. }).
  934. WithTransportOptions(func(tr *http.Transport) {
  935. tr.IdleConnTimeout = idleConnectionTimeout
  936. tr.WriteBufferSize = s3TransferBufferSize
  937. tr.ReadBufferSize = s3TransferBufferSize
  938. })
  939. if timeout > 0 {
  940. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  941. }
  942. return c
  943. }
  944. // ideally we should simply use url.PathEscape:
  945. //
  946. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  947. //
  948. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  949. func pathEscape(in string) string {
  950. var u url.URL
  951. u.Path = in
  952. return strings.ReplaceAll(u.String(), "+", "%2B")
  953. }