s3fs.go 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "mime"
  22. "net"
  23. "net/http"
  24. "net/url"
  25. "os"
  26. "path"
  27. "path/filepath"
  28. "sort"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/aws/aws-sdk-go-v2/aws"
  34. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  35. "github.com/aws/aws-sdk-go-v2/config"
  36. "github.com/aws/aws-sdk-go-v2/credentials"
  37. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  38. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  39. "github.com/aws/aws-sdk-go-v2/service/s3"
  40. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  41. "github.com/aws/aws-sdk-go-v2/service/sts"
  42. "github.com/eikenb/pipeat"
  43. "github.com/pkg/sftp"
  44. "github.com/drakkan/sftpgo/v2/internal/logger"
  45. "github.com/drakkan/sftpgo/v2/internal/metric"
  46. "github.com/drakkan/sftpgo/v2/internal/plugin"
  47. "github.com/drakkan/sftpgo/v2/internal/util"
  48. "github.com/drakkan/sftpgo/v2/internal/version"
  49. )
  50. const (
  51. // using this mime type for directories improves compatibility with s3fs-fuse
  52. s3DirMimeType = "application/x-directory"
  53. s3TransferBufferSize = 256 * 1024
  54. s3fsName = "S3Fs"
  55. )
  56. var (
  57. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  58. )
  59. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  60. type S3Fs struct {
  61. connectionID string
  62. localTempDir string
  63. // if not empty this fs is mouted as virtual folder in the specified path
  64. mountPath string
  65. config *S3FsConfig
  66. svc *s3.Client
  67. ctxTimeout time.Duration
  68. }
  69. func init() {
  70. version.AddFeature("+s3")
  71. }
  72. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  73. // object storage
  74. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  75. if localTempDir == "" {
  76. if tempPath != "" {
  77. localTempDir = tempPath
  78. } else {
  79. localTempDir = filepath.Clean(os.TempDir())
  80. }
  81. }
  82. fs := &S3Fs{
  83. connectionID: connectionID,
  84. localTempDir: localTempDir,
  85. mountPath: getMountPath(mountPath),
  86. config: &s3Config,
  87. ctxTimeout: 30 * time.Second,
  88. }
  89. if err := fs.config.validate(); err != nil {
  90. return fs, err
  91. }
  92. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  93. defer cancel()
  94. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(getAWSHTTPClient(0, 30*time.Second)))
  95. if err != nil {
  96. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  97. }
  98. if fs.config.Region != "" {
  99. awsConfig.Region = fs.config.Region
  100. }
  101. if !fs.config.AccessSecret.IsEmpty() {
  102. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  103. return fs, err
  104. }
  105. awsConfig.Credentials = aws.NewCredentialsCache(
  106. credentials.NewStaticCredentialsProvider(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), ""))
  107. }
  108. if fs.config.Endpoint != "" {
  109. endpointResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...any) (aws.Endpoint, error) {
  110. return aws.Endpoint{
  111. URL: fs.config.Endpoint,
  112. HostnameImmutable: fs.config.ForcePathStyle,
  113. PartitionID: "aws",
  114. SigningRegion: fs.config.Region,
  115. Source: aws.EndpointSourceCustom,
  116. }, nil
  117. })
  118. awsConfig.EndpointResolverWithOptions = endpointResolver
  119. }
  120. fs.setConfigDefaults()
  121. if fs.config.RoleARN != "" {
  122. client := sts.NewFromConfig(awsConfig)
  123. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  124. awsConfig.Credentials = creds
  125. }
  126. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  127. o.UsePathStyle = fs.config.ForcePathStyle
  128. })
  129. return fs, nil
  130. }
  131. // Name returns the name for the Fs implementation
  132. func (fs *S3Fs) Name() string {
  133. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  134. }
  135. // ConnectionID returns the connection ID associated to this Fs implementation
  136. func (fs *S3Fs) ConnectionID() string {
  137. return fs.connectionID
  138. }
  139. // Stat returns a FileInfo describing the named file
  140. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  141. var result *FileInfo
  142. if name == "" || name == "/" || name == "." {
  143. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  144. }
  145. if fs.config.KeyPrefix == name+"/" {
  146. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  147. }
  148. obj, err := fs.headObject(name)
  149. if err == nil {
  150. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  151. // So we check some common content types to detect if this is a "directory".
  152. isDir := util.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  153. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir, obj.ContentLength,
  154. util.GetTimeFromPointer(obj.LastModified), false))
  155. }
  156. if !fs.IsNotExist(err) {
  157. return result, err
  158. }
  159. // now check if this is a prefix (virtual directory)
  160. hasContents, err := fs.hasContents(name)
  161. if err == nil && hasContents {
  162. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  163. } else if err != nil {
  164. return nil, err
  165. }
  166. // the requested file may still be a directory as a zero bytes key
  167. // with a trailing forward slash (created using mkdir).
  168. // S3 doesn't return content type when listing objects, so we have
  169. // create "dirs" adding a trailing "/" to the key
  170. return fs.getStatForDir(name)
  171. }
  172. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  173. var result *FileInfo
  174. obj, err := fs.headObject(name + "/")
  175. if err != nil {
  176. return result, err
  177. }
  178. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, obj.ContentLength,
  179. util.GetTimeFromPointer(obj.LastModified), false))
  180. }
  181. // Lstat returns a FileInfo describing the named file
  182. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  183. return fs.Stat(name)
  184. }
  185. // Open opens the named file for reading
  186. func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  187. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  188. if err != nil {
  189. return nil, nil, nil, err
  190. }
  191. ctx, cancelFn := context.WithCancel(context.Background())
  192. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  193. d.Concurrency = fs.config.DownloadConcurrency
  194. d.PartSize = fs.config.DownloadPartSize
  195. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  196. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  197. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond)
  198. })
  199. }
  200. })
  201. var streamRange *string
  202. if offset > 0 {
  203. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  204. }
  205. go func() {
  206. defer cancelFn()
  207. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  208. Bucket: aws.String(fs.config.Bucket),
  209. Key: aws.String(name),
  210. Range: streamRange,
  211. })
  212. w.CloseWithError(err) //nolint:errcheck
  213. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %+v", name, n, err)
  214. metric.S3TransferCompleted(n, 1, err)
  215. }()
  216. return nil, r, cancelFn, nil
  217. }
  218. // Create creates or opens the named file for writing
  219. func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  220. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  221. if err != nil {
  222. return nil, nil, nil, err
  223. }
  224. p := NewPipeWriter(w)
  225. ctx, cancelFn := context.WithCancel(context.Background())
  226. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  227. u.Concurrency = fs.config.UploadConcurrency
  228. u.PartSize = fs.config.UploadPartSize
  229. if fs.config.UploadPartMaxTime > 0 {
  230. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  231. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond)
  232. })
  233. }
  234. })
  235. go func() {
  236. defer cancelFn()
  237. var contentType string
  238. if flag == -1 {
  239. contentType = s3DirMimeType
  240. } else {
  241. contentType = mime.TypeByExtension(path.Ext(name))
  242. }
  243. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  244. Bucket: aws.String(fs.config.Bucket),
  245. Key: aws.String(name),
  246. Body: r,
  247. ACL: types.ObjectCannedACL(fs.config.ACL),
  248. StorageClass: types.StorageClass(fs.config.StorageClass),
  249. ContentType: util.NilIfEmpty(contentType),
  250. })
  251. r.CloseWithError(err) //nolint:errcheck
  252. p.Done(err)
  253. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, acl: %#v, readed bytes: %v, err: %+v",
  254. name, fs.config.ACL, r.GetReadedBytes(), err)
  255. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  256. }()
  257. return nil, p, cancelFn, nil
  258. }
  259. // Rename renames (moves) source to target.
  260. // We don't support renaming non empty directories since we should
  261. // rename all the contents too and this could take long time: think
  262. // about directories with thousands of files, for each file we should
  263. // execute a CopyObject call.
  264. func (fs *S3Fs) Rename(source, target string) error {
  265. if source == target {
  266. return nil
  267. }
  268. fi, err := fs.Stat(source)
  269. if err != nil {
  270. return err
  271. }
  272. if fi.IsDir() {
  273. hasContents, err := fs.hasContents(source)
  274. if err != nil {
  275. return err
  276. }
  277. if hasContents {
  278. return fmt.Errorf("cannot rename non empty directory: %q", source)
  279. }
  280. if err := fs.mkdirInternal(target); err != nil {
  281. return err
  282. }
  283. } else {
  284. contentType := mime.TypeByExtension(path.Ext(source))
  285. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  286. if fi.Size() > 500*1024*1024 {
  287. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  288. source, fi.Size())
  289. err = fs.doMultipartCopy(copySource, target, contentType, fi.Size())
  290. } else {
  291. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  292. defer cancelFn()
  293. _, err = fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  294. Bucket: aws.String(fs.config.Bucket),
  295. CopySource: aws.String(copySource),
  296. Key: aws.String(target),
  297. StorageClass: types.StorageClass(fs.config.StorageClass),
  298. ACL: types.ObjectCannedACL(fs.config.ACL),
  299. ContentType: util.NilIfEmpty(contentType),
  300. })
  301. }
  302. if err != nil {
  303. metric.S3CopyObjectCompleted(err)
  304. return err
  305. }
  306. waiter := s3.NewObjectExistsWaiter(fs.svc)
  307. err = waiter.Wait(context.Background(), &s3.HeadObjectInput{
  308. Bucket: aws.String(fs.config.Bucket),
  309. Key: aws.String(target),
  310. }, 10*time.Second)
  311. metric.S3CopyObjectCompleted(err)
  312. if err != nil {
  313. return err
  314. }
  315. if plugin.Handler.HasMetadater() {
  316. err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
  317. util.GetTimeAsMsSinceEpoch(fi.ModTime()))
  318. if err != nil {
  319. fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %+v",
  320. source, target, err)
  321. }
  322. }
  323. }
  324. return fs.Remove(source, fi.IsDir())
  325. }
  326. // Remove removes the named file or (empty) directory.
  327. func (fs *S3Fs) Remove(name string, isDir bool) error {
  328. if isDir {
  329. hasContents, err := fs.hasContents(name)
  330. if err != nil {
  331. return err
  332. }
  333. if hasContents {
  334. return fmt.Errorf("cannot remove non empty directory: %#v", name)
  335. }
  336. if !strings.HasSuffix(name, "/") {
  337. name += "/"
  338. }
  339. }
  340. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  341. defer cancelFn()
  342. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  343. Bucket: aws.String(fs.config.Bucket),
  344. Key: aws.String(name),
  345. })
  346. metric.S3DeleteObjectCompleted(err)
  347. if plugin.Handler.HasMetadater() && err == nil && !isDir {
  348. if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
  349. fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %#v: %+v", name, errMetadata)
  350. }
  351. }
  352. return err
  353. }
  354. // Mkdir creates a new directory with the specified name and default permissions
  355. func (fs *S3Fs) Mkdir(name string) error {
  356. _, err := fs.Stat(name)
  357. if !fs.IsNotExist(err) {
  358. return err
  359. }
  360. return fs.mkdirInternal(name)
  361. }
  362. // Symlink creates source as a symbolic link to target.
  363. func (*S3Fs) Symlink(source, target string) error {
  364. return ErrVfsUnsupported
  365. }
  366. // Readlink returns the destination of the named symbolic link
  367. func (*S3Fs) Readlink(name string) (string, error) {
  368. return "", ErrVfsUnsupported
  369. }
  370. // Chown changes the numeric uid and gid of the named file.
  371. func (*S3Fs) Chown(name string, uid int, gid int) error {
  372. return ErrVfsUnsupported
  373. }
  374. // Chmod changes the mode of the named file to mode.
  375. func (*S3Fs) Chmod(name string, mode os.FileMode) error {
  376. return ErrVfsUnsupported
  377. }
  378. // Chtimes changes the access and modification times of the named file.
  379. func (fs *S3Fs) Chtimes(name string, atime, mtime time.Time, isUploading bool) error {
  380. if !plugin.Handler.HasMetadater() {
  381. return ErrVfsUnsupported
  382. }
  383. if !isUploading {
  384. info, err := fs.Stat(name)
  385. if err != nil {
  386. return err
  387. }
  388. if info.IsDir() {
  389. return ErrVfsUnsupported
  390. }
  391. }
  392. return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
  393. util.GetTimeAsMsSinceEpoch(mtime))
  394. }
  395. // Truncate changes the size of the named file.
  396. // Truncate by path is not supported, while truncating an opened
  397. // file is handled inside base transfer
  398. func (*S3Fs) Truncate(name string, size int64) error {
  399. return ErrVfsUnsupported
  400. }
  401. // ReadDir reads the directory named by dirname and returns
  402. // a list of directory entries.
  403. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  404. var result []os.FileInfo
  405. // dirname must be already cleaned
  406. prefix := fs.getPrefix(dirname)
  407. modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
  408. if err != nil {
  409. return result, err
  410. }
  411. prefixes := make(map[string]bool)
  412. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  413. Bucket: aws.String(fs.config.Bucket),
  414. Prefix: aws.String(prefix),
  415. Delimiter: aws.String("/"),
  416. })
  417. for paginator.HasMorePages() {
  418. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  419. defer cancelFn()
  420. page, err := paginator.NextPage(ctx)
  421. if err != nil {
  422. metric.S3ListObjectsCompleted(err)
  423. return result, err
  424. }
  425. for _, p := range page.CommonPrefixes {
  426. // prefixes have a trailing slash
  427. name, _ := fs.resolve(p.Prefix, prefix)
  428. if name == "" {
  429. continue
  430. }
  431. if _, ok := prefixes[name]; ok {
  432. continue
  433. }
  434. result = append(result, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  435. prefixes[name] = true
  436. }
  437. for _, fileObject := range page.Contents {
  438. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  439. name, isDir := fs.resolve(fileObject.Key, prefix)
  440. if name == "" || name == "/" {
  441. continue
  442. }
  443. if isDir {
  444. if _, ok := prefixes[name]; ok {
  445. continue
  446. }
  447. prefixes[name] = true
  448. }
  449. if t, ok := modTimes[name]; ok {
  450. objectModTime = util.GetTimeFromMsecSinceEpoch(t)
  451. }
  452. result = append(result, NewFileInfo(name, (isDir && fileObject.Size == 0), fileObject.Size,
  453. objectModTime, false))
  454. }
  455. }
  456. metric.S3ListObjectsCompleted(nil)
  457. return result, nil
  458. }
  459. // IsUploadResumeSupported returns true if resuming uploads is supported.
  460. // Resuming uploads is not supported on S3
  461. func (*S3Fs) IsUploadResumeSupported() bool {
  462. return false
  463. }
  464. // IsAtomicUploadSupported returns true if atomic upload is supported.
  465. // S3 uploads are already atomic, we don't need to upload to a temporary
  466. // file
  467. func (*S3Fs) IsAtomicUploadSupported() bool {
  468. return false
  469. }
  470. // IsNotExist returns a boolean indicating whether the error is known to
  471. // report that a file or directory does not exist
  472. func (*S3Fs) IsNotExist(err error) bool {
  473. if err == nil {
  474. return false
  475. }
  476. var re *awshttp.ResponseError
  477. if errors.As(err, &re) {
  478. if re.Response != nil {
  479. return re.Response.StatusCode == http.StatusNotFound
  480. }
  481. }
  482. return false
  483. }
  484. // IsPermission returns a boolean indicating whether the error is known to
  485. // report that permission is denied.
  486. func (*S3Fs) IsPermission(err error) bool {
  487. if err == nil {
  488. return false
  489. }
  490. var re *awshttp.ResponseError
  491. if errors.As(err, &re) {
  492. if re.Response != nil {
  493. return re.Response.StatusCode == http.StatusForbidden ||
  494. re.Response.StatusCode == http.StatusUnauthorized
  495. }
  496. }
  497. return false
  498. }
  499. // IsNotSupported returns true if the error indicate an unsupported operation
  500. func (*S3Fs) IsNotSupported(err error) bool {
  501. if err == nil {
  502. return false
  503. }
  504. return err == ErrVfsUnsupported
  505. }
  506. // CheckRootPath creates the specified local root directory if it does not exists
  507. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  508. // we need a local directory for temporary files
  509. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
  510. return osFs.CheckRootPath(username, uid, gid)
  511. }
  512. // ScanRootDirContents returns the number of files contained in the bucket,
  513. // and their size
  514. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  515. numFiles := 0
  516. size := int64(0)
  517. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  518. Bucket: aws.String(fs.config.Bucket),
  519. Prefix: aws.String(fs.config.KeyPrefix),
  520. })
  521. for paginator.HasMorePages() {
  522. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  523. defer cancelFn()
  524. page, err := paginator.NextPage(ctx)
  525. if err != nil {
  526. metric.S3ListObjectsCompleted(err)
  527. return numFiles, size, err
  528. }
  529. for _, fileObject := range page.Contents {
  530. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  531. if isDir && fileObject.Size == 0 {
  532. continue
  533. }
  534. numFiles++
  535. size += fileObject.Size
  536. if numFiles%1000 == 0 {
  537. fsLog(fs, logger.LevelDebug, "root dir scan in progress, files: %d, size: %d", numFiles, size)
  538. }
  539. }
  540. }
  541. metric.S3ListObjectsCompleted(nil)
  542. return numFiles, size, nil
  543. }
  544. func (fs *S3Fs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
  545. fileNames := make(map[string]bool)
  546. prefix := ""
  547. if fsPrefix != "/" {
  548. prefix = strings.TrimPrefix(fsPrefix, "/")
  549. }
  550. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  551. Bucket: aws.String(fs.config.Bucket),
  552. Prefix: aws.String(prefix),
  553. Delimiter: aws.String("/"),
  554. })
  555. for paginator.HasMorePages() {
  556. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  557. defer cancelFn()
  558. page, err := paginator.NextPage(ctx)
  559. if err != nil {
  560. metric.S3ListObjectsCompleted(err)
  561. if err != nil {
  562. fsLog(fs, logger.LevelError, "unable to get content for prefix %#v: %+v", prefix, err)
  563. return nil, err
  564. }
  565. return fileNames, err
  566. }
  567. for _, fileObject := range page.Contents {
  568. name, isDir := fs.resolve(fileObject.Key, prefix)
  569. if name != "" && !isDir {
  570. fileNames[name] = true
  571. }
  572. }
  573. }
  574. metric.S3ListObjectsCompleted(nil)
  575. return fileNames, nil
  576. }
  577. // CheckMetadata checks the metadata consistency
  578. func (fs *S3Fs) CheckMetadata() error {
  579. return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
  580. }
  581. // GetDirSize returns the number of files and the size for a folder
  582. // including any subfolders
  583. func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
  584. return 0, 0, ErrVfsUnsupported
  585. }
  586. // GetAtomicUploadPath returns the path to use for an atomic upload.
  587. // S3 uploads are already atomic, we never call this method for S3
  588. func (*S3Fs) GetAtomicUploadPath(name string) string {
  589. return ""
  590. }
  591. // GetRelativePath returns the path for a file relative to the user's home dir.
  592. // This is the path as seen by SFTPGo users
  593. func (fs *S3Fs) GetRelativePath(name string) string {
  594. rel := path.Clean(name)
  595. if rel == "." {
  596. rel = ""
  597. }
  598. if !path.IsAbs(rel) {
  599. rel = "/" + rel
  600. }
  601. if fs.config.KeyPrefix != "" {
  602. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  603. rel = "/"
  604. }
  605. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  606. }
  607. if fs.mountPath != "" {
  608. rel = path.Join(fs.mountPath, rel)
  609. }
  610. return rel
  611. }
  612. // Walk walks the file tree rooted at root, calling walkFn for each file or
  613. // directory in the tree, including root. The result are unordered
  614. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  615. prefix := fs.getPrefix(root)
  616. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  617. Bucket: aws.String(fs.config.Bucket),
  618. Prefix: aws.String(prefix),
  619. })
  620. for paginator.HasMorePages() {
  621. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  622. defer cancelFn()
  623. page, err := paginator.NextPage(ctx)
  624. if err != nil {
  625. metric.S3ListObjectsCompleted(err)
  626. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  627. return err
  628. }
  629. for _, fileObject := range page.Contents {
  630. name, isDir := fs.resolve(fileObject.Key, prefix)
  631. if name == "" {
  632. continue
  633. }
  634. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  635. NewFileInfo(name, isDir, fileObject.Size, util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  636. if err != nil {
  637. return err
  638. }
  639. }
  640. }
  641. metric.S3ListObjectsCompleted(nil)
  642. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  643. return nil
  644. }
  645. // Join joins any number of path elements into a single path
  646. func (*S3Fs) Join(elem ...string) string {
  647. return strings.TrimPrefix(path.Join(elem...), "/")
  648. }
  649. // HasVirtualFolders returns true if folders are emulated
  650. func (*S3Fs) HasVirtualFolders() bool {
  651. return true
  652. }
  653. // ResolvePath returns the matching filesystem path for the specified virtual path
  654. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  655. if fs.mountPath != "" {
  656. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  657. }
  658. if !path.IsAbs(virtualPath) {
  659. virtualPath = path.Clean("/" + virtualPath)
  660. }
  661. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  662. }
  663. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  664. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  665. isDir := strings.HasSuffix(result, "/")
  666. if isDir {
  667. result = strings.TrimSuffix(result, "/")
  668. }
  669. return result, isDir
  670. }
  671. func (fs *S3Fs) setConfigDefaults() {
  672. if fs.config.UploadPartSize == 0 {
  673. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  674. } else {
  675. if fs.config.UploadPartSize < 1024*1024 {
  676. fs.config.UploadPartSize *= 1024 * 1024
  677. }
  678. }
  679. if fs.config.UploadConcurrency == 0 {
  680. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  681. }
  682. if fs.config.DownloadPartSize == 0 {
  683. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  684. } else {
  685. if fs.config.DownloadPartSize < 1024*1024 {
  686. fs.config.DownloadPartSize *= 1024 * 1024
  687. }
  688. }
  689. if fs.config.DownloadConcurrency == 0 {
  690. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  691. }
  692. }
  693. func (fs *S3Fs) mkdirInternal(name string) error {
  694. if !strings.HasSuffix(name, "/") {
  695. name += "/"
  696. }
  697. _, w, _, err := fs.Create(name, -1)
  698. if err != nil {
  699. return err
  700. }
  701. return w.Close()
  702. }
  703. func (fs *S3Fs) hasContents(name string) (bool, error) {
  704. prefix := fs.getPrefix(name)
  705. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  706. Bucket: aws.String(fs.config.Bucket),
  707. Prefix: aws.String(prefix),
  708. MaxKeys: 2,
  709. })
  710. if paginator.HasMorePages() {
  711. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  712. defer cancelFn()
  713. page, err := paginator.NextPage(ctx)
  714. metric.S3ListObjectsCompleted(err)
  715. if err != nil {
  716. return false, err
  717. }
  718. for _, obj := range page.Contents {
  719. name, _ := fs.resolve(obj.Key, prefix)
  720. if name == "" || name == "/" {
  721. continue
  722. }
  723. return true, nil
  724. }
  725. return false, nil
  726. }
  727. metric.S3ListObjectsCompleted(nil)
  728. return false, nil
  729. }
  730. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  731. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  732. defer cancelFn()
  733. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  734. Bucket: aws.String(fs.config.Bucket),
  735. Key: aws.String(target),
  736. StorageClass: types.StorageClass(fs.config.StorageClass),
  737. ACL: types.ObjectCannedACL(fs.config.ACL),
  738. ContentType: util.NilIfEmpty(contentType),
  739. })
  740. if err != nil {
  741. return fmt.Errorf("unable to create multipart copy request: %w", err)
  742. }
  743. uploadID := util.GetStringFromPointer(res.UploadId)
  744. if uploadID == "" {
  745. return errors.New("unable to get multipart copy upload ID")
  746. }
  747. // We use 32 MB part size and copy 10 parts in parallel.
  748. // These values are arbitrary. We don't want to start too many goroutines
  749. maxPartSize := int64(32 * 1024 * 1024)
  750. if fileSize > int64(100*1024*1024*1024) {
  751. maxPartSize = int64(500 * 1024 * 1024)
  752. }
  753. guard := make(chan struct{}, 10)
  754. finished := false
  755. var completedParts []types.CompletedPart
  756. var partMutex sync.Mutex
  757. var wg sync.WaitGroup
  758. var hasError atomic.Bool
  759. var errOnce sync.Once
  760. var copyError error
  761. var partNumber int32
  762. var offset int64
  763. opCtx, opCancel := context.WithCancel(context.Background())
  764. defer opCancel()
  765. for partNumber = 1; !finished; partNumber++ {
  766. start := offset
  767. end := offset + maxPartSize
  768. if end >= fileSize {
  769. end = fileSize
  770. finished = true
  771. }
  772. offset = end
  773. guard <- struct{}{}
  774. if hasError.Load() {
  775. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  776. break
  777. }
  778. wg.Add(1)
  779. go func(partNum int32, partStart, partEnd int64) {
  780. defer func() {
  781. <-guard
  782. wg.Done()
  783. }()
  784. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  785. defer innerCancelFn()
  786. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  787. Bucket: aws.String(fs.config.Bucket),
  788. CopySource: aws.String(source),
  789. Key: aws.String(target),
  790. PartNumber: partNum,
  791. UploadId: aws.String(uploadID),
  792. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  793. })
  794. if err != nil {
  795. errOnce.Do(func() {
  796. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  797. hasError.Store(true)
  798. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  799. opCancel()
  800. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  801. defer abortCancelFn()
  802. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  803. Bucket: aws.String(fs.config.Bucket),
  804. Key: aws.String(target),
  805. UploadId: aws.String(uploadID),
  806. })
  807. if errAbort != nil {
  808. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  809. }
  810. })
  811. return
  812. }
  813. partMutex.Lock()
  814. completedParts = append(completedParts, types.CompletedPart{
  815. ETag: partResp.CopyPartResult.ETag,
  816. PartNumber: partNum,
  817. })
  818. partMutex.Unlock()
  819. }(partNumber, start, end)
  820. }
  821. wg.Wait()
  822. close(guard)
  823. if copyError != nil {
  824. return copyError
  825. }
  826. sort.Slice(completedParts, func(i, j int) bool {
  827. return completedParts[i].PartNumber < completedParts[j].PartNumber
  828. })
  829. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  830. defer completeCancelFn()
  831. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  832. Bucket: aws.String(fs.config.Bucket),
  833. Key: aws.String(target),
  834. UploadId: aws.String(uploadID),
  835. MultipartUpload: &types.CompletedMultipartUpload{
  836. Parts: completedParts,
  837. },
  838. })
  839. if err != nil {
  840. return fmt.Errorf("unable to complete multipart upload: %w", err)
  841. }
  842. return nil
  843. }
  844. func (fs *S3Fs) getPrefix(name string) string {
  845. prefix := ""
  846. if name != "" && name != "." && name != "/" {
  847. prefix = strings.TrimPrefix(name, "/")
  848. if !strings.HasSuffix(prefix, "/") {
  849. prefix += "/"
  850. }
  851. }
  852. return prefix
  853. }
  854. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  855. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  856. defer cancelFn()
  857. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  858. Bucket: aws.String(fs.config.Bucket),
  859. Key: aws.String(name),
  860. })
  861. metric.S3HeadObjectCompleted(err)
  862. return obj, err
  863. }
  864. // GetMimeType returns the content type
  865. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  866. obj, err := fs.headObject(name)
  867. if err != nil {
  868. return "", err
  869. }
  870. return util.GetStringFromPointer(obj.ContentType), nil
  871. }
  872. // Close closes the fs
  873. func (*S3Fs) Close() error {
  874. return nil
  875. }
  876. // GetAvailableDiskSize returns the available size for the specified path
  877. func (*S3Fs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  878. return nil, ErrStorageSizeUnavailable
  879. }
  880. func (fs *S3Fs) getStorageID() string {
  881. if fs.config.Endpoint != "" {
  882. if !strings.HasSuffix(fs.config.Endpoint, "/") {
  883. return fmt.Sprintf("s3://%v/%v", fs.config.Endpoint, fs.config.Bucket)
  884. }
  885. return fmt.Sprintf("s3://%v%v", fs.config.Endpoint, fs.config.Bucket)
  886. }
  887. return fmt.Sprintf("s3://%v", fs.config.Bucket)
  888. }
  889. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration) *awshttp.BuildableClient {
  890. c := awshttp.NewBuildableClient().
  891. WithDialerOptions(func(d *net.Dialer) {
  892. d.Timeout = 8 * time.Second
  893. }).
  894. WithTransportOptions(func(tr *http.Transport) {
  895. tr.IdleConnTimeout = idleConnectionTimeout
  896. tr.WriteBufferSize = s3TransferBufferSize
  897. tr.ReadBufferSize = s3TransferBufferSize
  898. })
  899. if timeout > 0 {
  900. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  901. }
  902. return c
  903. }
  904. // ideally we should simply use url.PathEscape:
  905. //
  906. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  907. //
  908. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  909. func pathEscape(in string) string {
  910. var u url.URL
  911. u.Path = in
  912. return strings.ReplaceAll(u.String(), "+", "%2B")
  913. }