s3fs.go 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "mime"
  22. "net"
  23. "net/http"
  24. "net/url"
  25. "os"
  26. "path"
  27. "path/filepath"
  28. "sort"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/aws/aws-sdk-go-v2/aws"
  34. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  35. "github.com/aws/aws-sdk-go-v2/config"
  36. "github.com/aws/aws-sdk-go-v2/credentials"
  37. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  38. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  39. "github.com/aws/aws-sdk-go-v2/service/s3"
  40. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  41. "github.com/aws/aws-sdk-go-v2/service/sts"
  42. "github.com/eikenb/pipeat"
  43. "github.com/pkg/sftp"
  44. "github.com/drakkan/sftpgo/v2/internal/logger"
  45. "github.com/drakkan/sftpgo/v2/internal/metric"
  46. "github.com/drakkan/sftpgo/v2/internal/plugin"
  47. "github.com/drakkan/sftpgo/v2/internal/util"
  48. "github.com/drakkan/sftpgo/v2/internal/version"
  49. )
  50. const (
  51. // using this mime type for directories improves compatibility with s3fs-fuse
  52. s3DirMimeType = "application/x-directory"
  53. s3TransferBufferSize = 256 * 1024
  54. s3fsName = "S3Fs"
  55. )
  56. var (
  57. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  58. )
  59. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  60. type S3Fs struct {
  61. connectionID string
  62. localTempDir string
  63. // if not empty this fs is mouted as virtual folder in the specified path
  64. mountPath string
  65. config *S3FsConfig
  66. svc *s3.Client
  67. ctxTimeout time.Duration
  68. }
  69. func init() {
  70. version.AddFeature("+s3")
  71. }
  72. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  73. // object storage
  74. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  75. if localTempDir == "" {
  76. if tempPath != "" {
  77. localTempDir = tempPath
  78. } else {
  79. localTempDir = filepath.Clean(os.TempDir())
  80. }
  81. }
  82. fs := &S3Fs{
  83. connectionID: connectionID,
  84. localTempDir: localTempDir,
  85. mountPath: getMountPath(mountPath),
  86. config: &s3Config,
  87. ctxTimeout: 30 * time.Second,
  88. }
  89. if err := fs.config.validate(); err != nil {
  90. return fs, err
  91. }
  92. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  93. defer cancel()
  94. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(getAWSHTTPClient(0, 30*time.Second)))
  95. if err != nil {
  96. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  97. }
  98. if fs.config.Region != "" {
  99. awsConfig.Region = fs.config.Region
  100. }
  101. if !fs.config.AccessSecret.IsEmpty() {
  102. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  103. return fs, err
  104. }
  105. awsConfig.Credentials = aws.NewCredentialsCache(
  106. credentials.NewStaticCredentialsProvider(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), ""))
  107. }
  108. if fs.config.Endpoint != "" {
  109. endpointResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...any) (aws.Endpoint, error) {
  110. return aws.Endpoint{
  111. URL: fs.config.Endpoint,
  112. HostnameImmutable: fs.config.ForcePathStyle,
  113. PartitionID: "aws",
  114. SigningRegion: fs.config.Region,
  115. Source: aws.EndpointSourceCustom,
  116. }, nil
  117. })
  118. awsConfig.EndpointResolverWithOptions = endpointResolver
  119. }
  120. fs.setConfigDefaults()
  121. if fs.config.RoleARN != "" {
  122. client := sts.NewFromConfig(awsConfig)
  123. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  124. awsConfig.Credentials = creds
  125. }
  126. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  127. o.UsePathStyle = fs.config.ForcePathStyle
  128. })
  129. return fs, nil
  130. }
  131. // Name returns the name for the Fs implementation
  132. func (fs *S3Fs) Name() string {
  133. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  134. }
  135. // ConnectionID returns the connection ID associated to this Fs implementation
  136. func (fs *S3Fs) ConnectionID() string {
  137. return fs.connectionID
  138. }
  139. // Stat returns a FileInfo describing the named file
  140. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  141. var result *FileInfo
  142. if name == "" || name == "/" || name == "." {
  143. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  144. }
  145. if fs.config.KeyPrefix == name+"/" {
  146. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  147. }
  148. obj, err := fs.headObject(name)
  149. if err == nil {
  150. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  151. // So we check some common content types to detect if this is a "directory".
  152. isDir := util.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  153. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir, obj.ContentLength,
  154. util.GetTimeFromPointer(obj.LastModified), false))
  155. }
  156. if !fs.IsNotExist(err) {
  157. return result, err
  158. }
  159. // now check if this is a prefix (virtual directory)
  160. hasContents, err := fs.hasContents(name)
  161. if err == nil && hasContents {
  162. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  163. } else if err != nil {
  164. return nil, err
  165. }
  166. // the requested file may still be a directory as a zero bytes key
  167. // with a trailing forward slash (created using mkdir).
  168. // S3 doesn't return content type when listing objects, so we have
  169. // create "dirs" adding a trailing "/" to the key
  170. return fs.getStatForDir(name)
  171. }
  172. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  173. var result *FileInfo
  174. obj, err := fs.headObject(name + "/")
  175. if err != nil {
  176. return result, err
  177. }
  178. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, obj.ContentLength,
  179. util.GetTimeFromPointer(obj.LastModified), false))
  180. }
  181. // Lstat returns a FileInfo describing the named file
  182. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  183. return fs.Stat(name)
  184. }
  185. // Open opens the named file for reading
  186. func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  187. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  188. if err != nil {
  189. return nil, nil, nil, err
  190. }
  191. ctx, cancelFn := context.WithCancel(context.Background())
  192. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  193. d.Concurrency = fs.config.DownloadConcurrency
  194. d.PartSize = fs.config.DownloadPartSize
  195. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  196. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  197. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond)
  198. })
  199. }
  200. })
  201. var streamRange *string
  202. if offset > 0 {
  203. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  204. }
  205. go func() {
  206. defer cancelFn()
  207. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  208. Bucket: aws.String(fs.config.Bucket),
  209. Key: aws.String(name),
  210. Range: streamRange,
  211. })
  212. w.CloseWithError(err) //nolint:errcheck
  213. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %+v", name, n, err)
  214. metric.S3TransferCompleted(n, 1, err)
  215. }()
  216. return nil, r, cancelFn, nil
  217. }
  218. // Create creates or opens the named file for writing
  219. func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  220. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  221. if err != nil {
  222. return nil, nil, nil, err
  223. }
  224. p := NewPipeWriter(w)
  225. ctx, cancelFn := context.WithCancel(context.Background())
  226. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  227. u.Concurrency = fs.config.UploadConcurrency
  228. u.PartSize = fs.config.UploadPartSize
  229. if fs.config.UploadPartMaxTime > 0 {
  230. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  231. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond)
  232. })
  233. }
  234. })
  235. go func() {
  236. defer cancelFn()
  237. var contentType string
  238. if flag == -1 {
  239. contentType = s3DirMimeType
  240. } else {
  241. contentType = mime.TypeByExtension(path.Ext(name))
  242. }
  243. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  244. Bucket: aws.String(fs.config.Bucket),
  245. Key: aws.String(name),
  246. Body: r,
  247. ACL: types.ObjectCannedACL(fs.config.ACL),
  248. StorageClass: types.StorageClass(fs.config.StorageClass),
  249. ContentType: util.NilIfEmpty(contentType),
  250. })
  251. r.CloseWithError(err) //nolint:errcheck
  252. p.Done(err)
  253. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, acl: %#v, readed bytes: %v, err: %+v",
  254. name, fs.config.ACL, r.GetReadedBytes(), err)
  255. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  256. }()
  257. return nil, p, cancelFn, nil
  258. }
  259. // Rename renames (moves) source to target.
  260. // We don't support renaming non empty directories since we should
  261. // rename all the contents too and this could take long time: think
  262. // about directories with thousands of files, for each file we should
  263. // execute a CopyObject call.
  264. func (fs *S3Fs) Rename(source, target string) error {
  265. if source == target {
  266. return nil
  267. }
  268. fi, err := fs.Stat(source)
  269. if err != nil {
  270. return err
  271. }
  272. copySource := fs.Join(fs.config.Bucket, source)
  273. if fi.IsDir() {
  274. hasContents, err := fs.hasContents(source)
  275. if err != nil {
  276. return err
  277. }
  278. if hasContents {
  279. return fmt.Errorf("cannot rename non empty directory: %#v", source)
  280. }
  281. if !strings.HasSuffix(copySource, "/") {
  282. copySource += "/"
  283. }
  284. if !strings.HasSuffix(target, "/") {
  285. target += "/"
  286. }
  287. }
  288. var contentType string
  289. if fi.IsDir() {
  290. contentType = s3DirMimeType
  291. } else {
  292. contentType = mime.TypeByExtension(path.Ext(source))
  293. }
  294. copySource = pathEscape(copySource)
  295. if fi.Size() > 500*1024*1024 {
  296. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  297. source, fi.Size())
  298. err = fs.doMultipartCopy(copySource, target, contentType, fi.Size())
  299. } else {
  300. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  301. defer cancelFn()
  302. _, err = fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  303. Bucket: aws.String(fs.config.Bucket),
  304. CopySource: aws.String(copySource),
  305. Key: aws.String(target),
  306. StorageClass: types.StorageClass(fs.config.StorageClass),
  307. ACL: types.ObjectCannedACL(fs.config.ACL),
  308. ContentType: util.NilIfEmpty(contentType),
  309. })
  310. }
  311. if err != nil {
  312. metric.S3CopyObjectCompleted(err)
  313. return err
  314. }
  315. waiter := s3.NewObjectExistsWaiter(fs.svc)
  316. err = waiter.Wait(context.Background(), &s3.HeadObjectInput{
  317. Bucket: aws.String(fs.config.Bucket),
  318. Key: aws.String(target),
  319. }, 10*time.Second)
  320. metric.S3CopyObjectCompleted(err)
  321. if err != nil {
  322. return err
  323. }
  324. if plugin.Handler.HasMetadater() {
  325. if !fi.IsDir() {
  326. err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
  327. util.GetTimeAsMsSinceEpoch(fi.ModTime()))
  328. if err != nil {
  329. fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %+v",
  330. source, target, err)
  331. }
  332. }
  333. }
  334. return fs.Remove(source, fi.IsDir())
  335. }
  336. // Remove removes the named file or (empty) directory.
  337. func (fs *S3Fs) Remove(name string, isDir bool) error {
  338. if isDir {
  339. hasContents, err := fs.hasContents(name)
  340. if err != nil {
  341. return err
  342. }
  343. if hasContents {
  344. return fmt.Errorf("cannot remove non empty directory: %#v", name)
  345. }
  346. if !strings.HasSuffix(name, "/") {
  347. name += "/"
  348. }
  349. }
  350. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  351. defer cancelFn()
  352. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  353. Bucket: aws.String(fs.config.Bucket),
  354. Key: aws.String(name),
  355. })
  356. metric.S3DeleteObjectCompleted(err)
  357. if plugin.Handler.HasMetadater() && err == nil && !isDir {
  358. if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
  359. fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %#v: %+v", name, errMetadata)
  360. }
  361. }
  362. return err
  363. }
  364. // Mkdir creates a new directory with the specified name and default permissions
  365. func (fs *S3Fs) Mkdir(name string) error {
  366. _, err := fs.Stat(name)
  367. if !fs.IsNotExist(err) {
  368. return err
  369. }
  370. if !strings.HasSuffix(name, "/") {
  371. name += "/"
  372. }
  373. _, w, _, err := fs.Create(name, -1)
  374. if err != nil {
  375. return err
  376. }
  377. return w.Close()
  378. }
  379. // Symlink creates source as a symbolic link to target.
  380. func (*S3Fs) Symlink(source, target string) error {
  381. return ErrVfsUnsupported
  382. }
  383. // Readlink returns the destination of the named symbolic link
  384. func (*S3Fs) Readlink(name string) (string, error) {
  385. return "", ErrVfsUnsupported
  386. }
  387. // Chown changes the numeric uid and gid of the named file.
  388. func (*S3Fs) Chown(name string, uid int, gid int) error {
  389. return ErrVfsUnsupported
  390. }
  391. // Chmod changes the mode of the named file to mode.
  392. func (*S3Fs) Chmod(name string, mode os.FileMode) error {
  393. return ErrVfsUnsupported
  394. }
  395. // Chtimes changes the access and modification times of the named file.
  396. func (fs *S3Fs) Chtimes(name string, atime, mtime time.Time, isUploading bool) error {
  397. if !plugin.Handler.HasMetadater() {
  398. return ErrVfsUnsupported
  399. }
  400. if !isUploading {
  401. info, err := fs.Stat(name)
  402. if err != nil {
  403. return err
  404. }
  405. if info.IsDir() {
  406. return ErrVfsUnsupported
  407. }
  408. }
  409. return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
  410. util.GetTimeAsMsSinceEpoch(mtime))
  411. }
  412. // Truncate changes the size of the named file.
  413. // Truncate by path is not supported, while truncating an opened
  414. // file is handled inside base transfer
  415. func (*S3Fs) Truncate(name string, size int64) error {
  416. return ErrVfsUnsupported
  417. }
  418. // ReadDir reads the directory named by dirname and returns
  419. // a list of directory entries.
  420. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  421. var result []os.FileInfo
  422. // dirname must be already cleaned
  423. prefix := fs.getPrefix(dirname)
  424. modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
  425. if err != nil {
  426. return result, err
  427. }
  428. prefixes := make(map[string]bool)
  429. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  430. Bucket: aws.String(fs.config.Bucket),
  431. Prefix: aws.String(prefix),
  432. Delimiter: aws.String("/"),
  433. })
  434. for paginator.HasMorePages() {
  435. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  436. defer cancelFn()
  437. page, err := paginator.NextPage(ctx)
  438. if err != nil {
  439. metric.S3ListObjectsCompleted(err)
  440. return result, err
  441. }
  442. for _, p := range page.CommonPrefixes {
  443. // prefixes have a trailing slash
  444. name, _ := fs.resolve(p.Prefix, prefix)
  445. if name == "" {
  446. continue
  447. }
  448. if _, ok := prefixes[name]; ok {
  449. continue
  450. }
  451. result = append(result, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  452. prefixes[name] = true
  453. }
  454. for _, fileObject := range page.Contents {
  455. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  456. name, isDir := fs.resolve(fileObject.Key, prefix)
  457. if name == "" || name == "/" {
  458. continue
  459. }
  460. if isDir {
  461. if _, ok := prefixes[name]; ok {
  462. continue
  463. }
  464. prefixes[name] = true
  465. }
  466. if t, ok := modTimes[name]; ok {
  467. objectModTime = util.GetTimeFromMsecSinceEpoch(t)
  468. }
  469. result = append(result, NewFileInfo(name, (isDir && fileObject.Size == 0), fileObject.Size,
  470. objectModTime, false))
  471. }
  472. }
  473. metric.S3ListObjectsCompleted(nil)
  474. return result, nil
  475. }
  476. // IsUploadResumeSupported returns true if resuming uploads is supported.
  477. // Resuming uploads is not supported on S3
  478. func (*S3Fs) IsUploadResumeSupported() bool {
  479. return false
  480. }
  481. // IsAtomicUploadSupported returns true if atomic upload is supported.
  482. // S3 uploads are already atomic, we don't need to upload to a temporary
  483. // file
  484. func (*S3Fs) IsAtomicUploadSupported() bool {
  485. return false
  486. }
  487. // IsNotExist returns a boolean indicating whether the error is known to
  488. // report that a file or directory does not exist
  489. func (*S3Fs) IsNotExist(err error) bool {
  490. if err == nil {
  491. return false
  492. }
  493. var re *awshttp.ResponseError
  494. if errors.As(err, &re) {
  495. if re.Response != nil {
  496. return re.Response.StatusCode == http.StatusNotFound
  497. }
  498. }
  499. return false
  500. }
  501. // IsPermission returns a boolean indicating whether the error is known to
  502. // report that permission is denied.
  503. func (*S3Fs) IsPermission(err error) bool {
  504. if err == nil {
  505. return false
  506. }
  507. var re *awshttp.ResponseError
  508. if errors.As(err, &re) {
  509. if re.Response != nil {
  510. return re.Response.StatusCode == http.StatusForbidden ||
  511. re.Response.StatusCode == http.StatusUnauthorized
  512. }
  513. }
  514. return false
  515. }
  516. // IsNotSupported returns true if the error indicate an unsupported operation
  517. func (*S3Fs) IsNotSupported(err error) bool {
  518. if err == nil {
  519. return false
  520. }
  521. return err == ErrVfsUnsupported
  522. }
  523. // CheckRootPath creates the specified local root directory if it does not exists
  524. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  525. // we need a local directory for temporary files
  526. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
  527. return osFs.CheckRootPath(username, uid, gid)
  528. }
  529. // ScanRootDirContents returns the number of files contained in the bucket,
  530. // and their size
  531. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  532. numFiles := 0
  533. size := int64(0)
  534. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  535. Bucket: aws.String(fs.config.Bucket),
  536. Prefix: aws.String(fs.config.KeyPrefix),
  537. })
  538. for paginator.HasMorePages() {
  539. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  540. defer cancelFn()
  541. page, err := paginator.NextPage(ctx)
  542. if err != nil {
  543. metric.S3ListObjectsCompleted(err)
  544. return numFiles, size, err
  545. }
  546. for _, fileObject := range page.Contents {
  547. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  548. if isDir && fileObject.Size == 0 {
  549. continue
  550. }
  551. numFiles++
  552. size += fileObject.Size
  553. if numFiles%1000 == 0 {
  554. fsLog(fs, logger.LevelDebug, "root dir scan in progress, files: %d, size: %d", numFiles, size)
  555. }
  556. }
  557. }
  558. metric.S3ListObjectsCompleted(nil)
  559. return numFiles, size, nil
  560. }
  561. func (fs *S3Fs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
  562. fileNames := make(map[string]bool)
  563. prefix := ""
  564. if fsPrefix != "/" {
  565. prefix = strings.TrimPrefix(fsPrefix, "/")
  566. }
  567. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  568. Bucket: aws.String(fs.config.Bucket),
  569. Prefix: aws.String(prefix),
  570. Delimiter: aws.String("/"),
  571. })
  572. for paginator.HasMorePages() {
  573. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  574. defer cancelFn()
  575. page, err := paginator.NextPage(ctx)
  576. if err != nil {
  577. metric.S3ListObjectsCompleted(err)
  578. if err != nil {
  579. fsLog(fs, logger.LevelError, "unable to get content for prefix %#v: %+v", prefix, err)
  580. return nil, err
  581. }
  582. return fileNames, err
  583. }
  584. for _, fileObject := range page.Contents {
  585. name, isDir := fs.resolve(fileObject.Key, prefix)
  586. if name != "" && !isDir {
  587. fileNames[name] = true
  588. }
  589. }
  590. }
  591. metric.S3ListObjectsCompleted(nil)
  592. return fileNames, nil
  593. }
  594. // CheckMetadata checks the metadata consistency
  595. func (fs *S3Fs) CheckMetadata() error {
  596. return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
  597. }
  598. // GetDirSize returns the number of files and the size for a folder
  599. // including any subfolders
  600. func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
  601. return 0, 0, ErrVfsUnsupported
  602. }
  603. // GetAtomicUploadPath returns the path to use for an atomic upload.
  604. // S3 uploads are already atomic, we never call this method for S3
  605. func (*S3Fs) GetAtomicUploadPath(name string) string {
  606. return ""
  607. }
  608. // GetRelativePath returns the path for a file relative to the user's home dir.
  609. // This is the path as seen by SFTPGo users
  610. func (fs *S3Fs) GetRelativePath(name string) string {
  611. rel := path.Clean(name)
  612. if rel == "." {
  613. rel = ""
  614. }
  615. if !path.IsAbs(rel) {
  616. rel = "/" + rel
  617. }
  618. if fs.config.KeyPrefix != "" {
  619. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  620. rel = "/"
  621. }
  622. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  623. }
  624. if fs.mountPath != "" {
  625. rel = path.Join(fs.mountPath, rel)
  626. }
  627. return rel
  628. }
  629. // Walk walks the file tree rooted at root, calling walkFn for each file or
  630. // directory in the tree, including root. The result are unordered
  631. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  632. prefix := fs.getPrefix(root)
  633. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  634. Bucket: aws.String(fs.config.Bucket),
  635. Prefix: aws.String(prefix),
  636. })
  637. for paginator.HasMorePages() {
  638. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  639. defer cancelFn()
  640. page, err := paginator.NextPage(ctx)
  641. if err != nil {
  642. metric.S3ListObjectsCompleted(err)
  643. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  644. return err
  645. }
  646. for _, fileObject := range page.Contents {
  647. name, isDir := fs.resolve(fileObject.Key, prefix)
  648. if name == "" {
  649. continue
  650. }
  651. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  652. NewFileInfo(name, isDir, fileObject.Size, util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  653. if err != nil {
  654. return err
  655. }
  656. }
  657. }
  658. metric.S3ListObjectsCompleted(nil)
  659. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  660. return nil
  661. }
  662. // Join joins any number of path elements into a single path
  663. func (*S3Fs) Join(elem ...string) string {
  664. return strings.TrimPrefix(path.Join(elem...), "/")
  665. }
  666. // HasVirtualFolders returns true if folders are emulated
  667. func (*S3Fs) HasVirtualFolders() bool {
  668. return true
  669. }
  670. // ResolvePath returns the matching filesystem path for the specified virtual path
  671. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  672. if fs.mountPath != "" {
  673. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  674. }
  675. if !path.IsAbs(virtualPath) {
  676. virtualPath = path.Clean("/" + virtualPath)
  677. }
  678. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  679. }
  680. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  681. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  682. isDir := strings.HasSuffix(result, "/")
  683. if isDir {
  684. result = strings.TrimSuffix(result, "/")
  685. }
  686. return result, isDir
  687. }
  688. func (fs *S3Fs) setConfigDefaults() {
  689. if fs.config.UploadPartSize == 0 {
  690. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  691. } else {
  692. if fs.config.UploadPartSize < 1024*1024 {
  693. fs.config.UploadPartSize *= 1024 * 1024
  694. }
  695. }
  696. if fs.config.UploadConcurrency == 0 {
  697. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  698. }
  699. if fs.config.DownloadPartSize == 0 {
  700. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  701. } else {
  702. if fs.config.DownloadPartSize < 1024*1024 {
  703. fs.config.DownloadPartSize *= 1024 * 1024
  704. }
  705. }
  706. if fs.config.DownloadConcurrency == 0 {
  707. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  708. }
  709. }
  710. func (fs *S3Fs) hasContents(name string) (bool, error) {
  711. prefix := fs.getPrefix(name)
  712. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  713. Bucket: aws.String(fs.config.Bucket),
  714. Prefix: aws.String(prefix),
  715. MaxKeys: 2,
  716. })
  717. if paginator.HasMorePages() {
  718. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  719. defer cancelFn()
  720. page, err := paginator.NextPage(ctx)
  721. metric.S3ListObjectsCompleted(err)
  722. if err != nil {
  723. return false, err
  724. }
  725. for _, obj := range page.Contents {
  726. name, _ := fs.resolve(obj.Key, prefix)
  727. if name == "" || name == "/" {
  728. continue
  729. }
  730. return true, nil
  731. }
  732. return false, nil
  733. }
  734. metric.S3ListObjectsCompleted(nil)
  735. return false, nil
  736. }
  737. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  738. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  739. defer cancelFn()
  740. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  741. Bucket: aws.String(fs.config.Bucket),
  742. Key: aws.String(target),
  743. StorageClass: types.StorageClass(fs.config.StorageClass),
  744. ACL: types.ObjectCannedACL(fs.config.ACL),
  745. ContentType: util.NilIfEmpty(contentType),
  746. })
  747. if err != nil {
  748. return fmt.Errorf("unable to create multipart copy request: %w", err)
  749. }
  750. uploadID := util.GetStringFromPointer(res.UploadId)
  751. if uploadID == "" {
  752. return errors.New("unable to get multipart copy upload ID")
  753. }
  754. // We use 32 MB part size and copy 10 parts in parallel.
  755. // These values are arbitrary. We don't want to start too many goroutines
  756. maxPartSize := int64(32 * 1024 * 1024)
  757. if fileSize > int64(100*1024*1024*1024) {
  758. maxPartSize = int64(500 * 1024 * 1024)
  759. }
  760. guard := make(chan struct{}, 10)
  761. finished := false
  762. var completedParts []types.CompletedPart
  763. var partMutex sync.Mutex
  764. var wg sync.WaitGroup
  765. var hasError atomic.Bool
  766. var errOnce sync.Once
  767. var copyError error
  768. var partNumber int32
  769. var offset int64
  770. opCtx, opCancel := context.WithCancel(context.Background())
  771. defer opCancel()
  772. for partNumber = 1; !finished; partNumber++ {
  773. start := offset
  774. end := offset + maxPartSize
  775. if end >= fileSize {
  776. end = fileSize
  777. finished = true
  778. }
  779. offset = end
  780. guard <- struct{}{}
  781. if hasError.Load() {
  782. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  783. break
  784. }
  785. wg.Add(1)
  786. go func(partNum int32, partStart, partEnd int64) {
  787. defer func() {
  788. <-guard
  789. wg.Done()
  790. }()
  791. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  792. defer innerCancelFn()
  793. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  794. Bucket: aws.String(fs.config.Bucket),
  795. CopySource: aws.String(source),
  796. Key: aws.String(target),
  797. PartNumber: partNum,
  798. UploadId: aws.String(uploadID),
  799. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  800. })
  801. if err != nil {
  802. errOnce.Do(func() {
  803. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  804. hasError.Store(true)
  805. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  806. opCancel()
  807. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  808. defer abortCancelFn()
  809. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  810. Bucket: aws.String(fs.config.Bucket),
  811. Key: aws.String(target),
  812. UploadId: aws.String(uploadID),
  813. })
  814. if errAbort != nil {
  815. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  816. }
  817. })
  818. return
  819. }
  820. partMutex.Lock()
  821. completedParts = append(completedParts, types.CompletedPart{
  822. ETag: partResp.CopyPartResult.ETag,
  823. PartNumber: partNum,
  824. })
  825. partMutex.Unlock()
  826. }(partNumber, start, end)
  827. }
  828. wg.Wait()
  829. close(guard)
  830. if copyError != nil {
  831. return copyError
  832. }
  833. sort.Slice(completedParts, func(i, j int) bool {
  834. return completedParts[i].PartNumber < completedParts[j].PartNumber
  835. })
  836. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  837. defer completeCancelFn()
  838. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  839. Bucket: aws.String(fs.config.Bucket),
  840. Key: aws.String(target),
  841. UploadId: aws.String(uploadID),
  842. MultipartUpload: &types.CompletedMultipartUpload{
  843. Parts: completedParts,
  844. },
  845. })
  846. if err != nil {
  847. return fmt.Errorf("unable to complete multipart upload: %w", err)
  848. }
  849. return nil
  850. }
  851. func (fs *S3Fs) getPrefix(name string) string {
  852. prefix := ""
  853. if name != "" && name != "." && name != "/" {
  854. prefix = strings.TrimPrefix(name, "/")
  855. if !strings.HasSuffix(prefix, "/") {
  856. prefix += "/"
  857. }
  858. }
  859. return prefix
  860. }
  861. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  862. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  863. defer cancelFn()
  864. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  865. Bucket: aws.String(fs.config.Bucket),
  866. Key: aws.String(name),
  867. })
  868. metric.S3HeadObjectCompleted(err)
  869. return obj, err
  870. }
  871. // GetMimeType returns the content type
  872. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  873. obj, err := fs.headObject(name)
  874. if err != nil {
  875. return "", err
  876. }
  877. return util.GetStringFromPointer(obj.ContentType), nil
  878. }
  879. // Close closes the fs
  880. func (*S3Fs) Close() error {
  881. return nil
  882. }
  883. // GetAvailableDiskSize returns the available size for the specified path
  884. func (*S3Fs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  885. return nil, ErrStorageSizeUnavailable
  886. }
  887. func (fs *S3Fs) getStorageID() string {
  888. if fs.config.Endpoint != "" {
  889. if !strings.HasSuffix(fs.config.Endpoint, "/") {
  890. return fmt.Sprintf("s3://%v/%v", fs.config.Endpoint, fs.config.Bucket)
  891. }
  892. return fmt.Sprintf("s3://%v%v", fs.config.Endpoint, fs.config.Bucket)
  893. }
  894. return fmt.Sprintf("s3://%v", fs.config.Bucket)
  895. }
  896. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration) *awshttp.BuildableClient {
  897. c := awshttp.NewBuildableClient().
  898. WithDialerOptions(func(d *net.Dialer) {
  899. d.Timeout = 8 * time.Second
  900. }).
  901. WithTransportOptions(func(tr *http.Transport) {
  902. tr.IdleConnTimeout = idleConnectionTimeout
  903. tr.WriteBufferSize = s3TransferBufferSize
  904. tr.ReadBufferSize = s3TransferBufferSize
  905. })
  906. if timeout > 0 {
  907. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  908. }
  909. return c
  910. }
  911. // ideally we should simply use url.PathEscape:
  912. //
  913. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  914. //
  915. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  916. func pathEscape(in string) string {
  917. var u url.URL
  918. u.Path = in
  919. return strings.ReplaceAll(u.String(), "+", "%2B")
  920. }