gcsfs.go 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nogcs
  15. // +build !nogcs
  16. package vfs
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "io"
  22. "mime"
  23. "net/http"
  24. "os"
  25. "path"
  26. "path/filepath"
  27. "strconv"
  28. "strings"
  29. "time"
  30. "cloud.google.com/go/storage"
  31. "github.com/eikenb/pipeat"
  32. "github.com/pkg/sftp"
  33. "github.com/rs/xid"
  34. "google.golang.org/api/googleapi"
  35. "google.golang.org/api/iterator"
  36. "google.golang.org/api/option"
  37. "github.com/drakkan/sftpgo/v2/internal/logger"
  38. "github.com/drakkan/sftpgo/v2/internal/metric"
  39. "github.com/drakkan/sftpgo/v2/internal/util"
  40. "github.com/drakkan/sftpgo/v2/internal/version"
  41. )
  42. const (
  43. defaultGCSPageSize = 5000
  44. )
  45. var (
  46. gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated", "ContentType", "Metadata"}
  47. )
  48. // GCSFs is a Fs implementation for Google Cloud Storage.
  49. type GCSFs struct {
  50. connectionID string
  51. localTempDir string
  52. // if not empty this fs is mouted as virtual folder in the specified path
  53. mountPath string
  54. config *GCSFsConfig
  55. svc *storage.Client
  56. ctxTimeout time.Duration
  57. ctxLongTimeout time.Duration
  58. }
  59. func init() {
  60. version.AddFeature("+gcs")
  61. }
  62. // NewGCSFs returns an GCSFs object that allows to interact with Google Cloud Storage
  63. func NewGCSFs(connectionID, localTempDir, mountPath string, config GCSFsConfig) (Fs, error) {
  64. if localTempDir == "" {
  65. localTempDir = getLocalTempDir()
  66. }
  67. var err error
  68. fs := &GCSFs{
  69. connectionID: connectionID,
  70. localTempDir: localTempDir,
  71. mountPath: getMountPath(mountPath),
  72. config: &config,
  73. ctxTimeout: 30 * time.Second,
  74. ctxLongTimeout: 300 * time.Second,
  75. }
  76. if err = fs.config.validate(); err != nil {
  77. return fs, err
  78. }
  79. ctx := context.Background()
  80. if fs.config.AutomaticCredentials > 0 {
  81. fs.svc, err = storage.NewClient(ctx)
  82. } else {
  83. err = fs.config.Credentials.TryDecrypt()
  84. if err != nil {
  85. return fs, err
  86. }
  87. fs.svc, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(fs.config.Credentials.GetPayload())))
  88. }
  89. return fs, err
  90. }
  91. // Name returns the name for the Fs implementation
  92. func (fs *GCSFs) Name() string {
  93. return fmt.Sprintf("%s bucket %q", gcsfsName, fs.config.Bucket)
  94. }
  95. // ConnectionID returns the connection ID associated to this Fs implementation
  96. func (fs *GCSFs) ConnectionID() string {
  97. return fs.connectionID
  98. }
  99. // Stat returns a FileInfo describing the named file
  100. func (fs *GCSFs) Stat(name string) (os.FileInfo, error) {
  101. if name == "" || name == "/" || name == "." {
  102. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  103. }
  104. if fs.config.KeyPrefix == name+"/" {
  105. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  106. }
  107. return fs.getObjectStat(name)
  108. }
  109. // Lstat returns a FileInfo describing the named file
  110. func (fs *GCSFs) Lstat(name string) (os.FileInfo, error) {
  111. return fs.Stat(name)
  112. }
  113. // Open opens the named file for reading
  114. func (fs *GCSFs) Open(name string, offset int64) (File, PipeReader, func(), error) {
  115. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  116. if err != nil {
  117. return nil, nil, nil, err
  118. }
  119. p := NewPipeReader(r)
  120. if readMetadata > 0 {
  121. attrs, err := fs.headObject(name)
  122. if err != nil {
  123. r.Close()
  124. w.Close()
  125. return nil, nil, nil, err
  126. }
  127. p.setMetadata(attrs.Metadata)
  128. }
  129. bkt := fs.svc.Bucket(fs.config.Bucket)
  130. obj := bkt.Object(name)
  131. ctx, cancelFn := context.WithCancel(context.Background())
  132. objectReader, err := obj.NewRangeReader(ctx, offset, -1)
  133. if err == nil && offset > 0 && objectReader.Attrs.ContentEncoding == "gzip" {
  134. err = fmt.Errorf("range request is not possible for gzip content encoding, requested offset %d", offset)
  135. objectReader.Close()
  136. }
  137. if err != nil {
  138. r.Close()
  139. w.Close()
  140. cancelFn()
  141. return nil, nil, nil, err
  142. }
  143. go func() {
  144. defer cancelFn()
  145. defer objectReader.Close()
  146. n, err := io.Copy(w, objectReader)
  147. w.CloseWithError(err) //nolint:errcheck
  148. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  149. metric.GCSTransferCompleted(n, 1, err)
  150. }()
  151. return nil, p, cancelFn, nil
  152. }
  153. // Create creates or opens the named file for writing
  154. func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
  155. if checks&CheckParentDir != 0 {
  156. _, err := fs.Stat(path.Dir(name))
  157. if err != nil {
  158. return nil, nil, nil, err
  159. }
  160. }
  161. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  162. if err != nil {
  163. return nil, nil, nil, err
  164. }
  165. var partialFileName string
  166. var attrs *storage.ObjectAttrs
  167. var statErr error
  168. bkt := fs.svc.Bucket(fs.config.Bucket)
  169. obj := bkt.Object(name)
  170. if flag == -1 {
  171. obj = obj.If(storage.Conditions{DoesNotExist: true})
  172. } else {
  173. attrs, statErr = fs.headObject(name)
  174. if statErr == nil {
  175. obj = obj.If(storage.Conditions{GenerationMatch: attrs.Generation})
  176. } else if fs.IsNotExist(statErr) {
  177. obj = obj.If(storage.Conditions{DoesNotExist: true})
  178. } else {
  179. fsLog(fs, logger.LevelWarn, "unable to set precondition for %q, stat err: %v", name, statErr)
  180. }
  181. }
  182. ctx, cancelFn := context.WithCancel(context.Background())
  183. var p PipeWriter
  184. var objectWriter *storage.Writer
  185. if checks&CheckResume != 0 {
  186. if statErr != nil {
  187. cancelFn()
  188. r.Close()
  189. w.Close()
  190. return nil, nil, nil, fmt.Errorf("unable to resume %q stat error: %w", name, statErr)
  191. }
  192. p = newPipeWriterAtOffset(w, attrs.Size)
  193. partialFileName = fs.getTempObject(name)
  194. partialObj := bkt.Object(partialFileName)
  195. partialObj = partialObj.If(storage.Conditions{DoesNotExist: true})
  196. objectWriter = partialObj.NewWriter(ctx)
  197. } else {
  198. p = NewPipeWriter(w)
  199. objectWriter = obj.NewWriter(ctx)
  200. }
  201. if fs.config.UploadPartSize > 0 {
  202. objectWriter.ChunkSize = int(fs.config.UploadPartSize) * 1024 * 1024
  203. }
  204. if fs.config.UploadPartMaxTime > 0 {
  205. objectWriter.ChunkRetryDeadline = time.Duration(fs.config.UploadPartMaxTime) * time.Second
  206. }
  207. fs.setWriterAttrs(objectWriter, flag, name)
  208. go func() {
  209. defer cancelFn()
  210. n, err := io.Copy(objectWriter, r)
  211. closeErr := objectWriter.Close()
  212. if err == nil {
  213. err = closeErr
  214. }
  215. if err == nil && partialFileName != "" {
  216. partialObject := bkt.Object(partialFileName)
  217. partialObject = partialObject.If(storage.Conditions{GenerationMatch: objectWriter.Attrs().Generation})
  218. err = fs.composeObjects(ctx, obj, partialObject)
  219. }
  220. r.CloseWithError(err) //nolint:errcheck
  221. p.Done(err)
  222. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %v, err: %+v",
  223. name, fs.config.ACL, n, err)
  224. metric.GCSTransferCompleted(n, 0, err)
  225. }()
  226. if uploadMode&8 != 0 {
  227. return nil, p, nil, nil
  228. }
  229. return nil, p, cancelFn, nil
  230. }
  231. // Rename renames (moves) source to target.
  232. func (fs *GCSFs) Rename(source, target string, checks int) (int, int64, error) {
  233. if source == target {
  234. return -1, -1, nil
  235. }
  236. if checks&CheckParentDir != 0 {
  237. _, err := fs.Stat(path.Dir(target))
  238. if err != nil {
  239. return -1, -1, err
  240. }
  241. }
  242. fi, err := fs.getObjectStat(source)
  243. if err != nil {
  244. return -1, -1, err
  245. }
  246. return fs.renameInternal(source, target, fi, 0, checks&CheckUpdateModTime != 0)
  247. }
  248. // Remove removes the named file or (empty) directory.
  249. func (fs *GCSFs) Remove(name string, isDir bool) error {
  250. if isDir {
  251. hasContents, err := fs.hasContents(name)
  252. if err != nil {
  253. return err
  254. }
  255. if hasContents {
  256. return fmt.Errorf("cannot remove non empty directory: %q", name)
  257. }
  258. if !strings.HasSuffix(name, "/") {
  259. name += "/"
  260. }
  261. }
  262. obj := fs.svc.Bucket(fs.config.Bucket).Object(name)
  263. attrs, statErr := fs.headObject(name)
  264. if statErr == nil {
  265. obj = obj.If(storage.Conditions{GenerationMatch: attrs.Generation})
  266. } else {
  267. fsLog(fs, logger.LevelWarn, "unable to set precondition for deleting %q, stat err: %v",
  268. name, statErr)
  269. }
  270. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  271. defer cancelFn()
  272. err := obj.Delete(ctx)
  273. if isDir && fs.IsNotExist(err) {
  274. // we can have directories without a trailing "/" (created using v2.1.0 and before)
  275. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  276. defer cancelFn()
  277. err = fs.svc.Bucket(fs.config.Bucket).Object(strings.TrimSuffix(name, "/")).Delete(ctx)
  278. }
  279. metric.GCSDeleteObjectCompleted(err)
  280. return err
  281. }
  282. // Mkdir creates a new directory with the specified name and default permissions
  283. func (fs *GCSFs) Mkdir(name string) error {
  284. _, err := fs.Stat(name)
  285. if !fs.IsNotExist(err) {
  286. return err
  287. }
  288. return fs.mkdirInternal(name)
  289. }
  290. // Symlink creates source as a symbolic link to target.
  291. func (*GCSFs) Symlink(_, _ string) error {
  292. return ErrVfsUnsupported
  293. }
  294. // Readlink returns the destination of the named symbolic link
  295. func (*GCSFs) Readlink(_ string) (string, error) {
  296. return "", ErrVfsUnsupported
  297. }
  298. // Chown changes the numeric uid and gid of the named file.
  299. func (*GCSFs) Chown(_ string, _ int, _ int) error {
  300. return ErrVfsUnsupported
  301. }
  302. // Chmod changes the mode of the named file to mode.
  303. func (*GCSFs) Chmod(_ string, _ os.FileMode) error {
  304. return ErrVfsUnsupported
  305. }
  306. // Chtimes changes the access and modification times of the named file.
  307. func (fs *GCSFs) Chtimes(name string, _, mtime time.Time, isUploading bool) error {
  308. if isUploading {
  309. return nil
  310. }
  311. obj := fs.svc.Bucket(fs.config.Bucket).Object(name)
  312. attrs, err := fs.headObject(name)
  313. if err != nil {
  314. return err
  315. }
  316. obj = obj.If(storage.Conditions{MetagenerationMatch: attrs.Metageneration})
  317. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  318. defer cancelFn()
  319. metadata := attrs.Metadata
  320. if metadata == nil {
  321. metadata = make(map[string]string)
  322. }
  323. metadata[lastModifiedField] = strconv.FormatInt(mtime.UnixMilli(), 10)
  324. objectAttrsToUpdate := storage.ObjectAttrsToUpdate{
  325. Metadata: metadata,
  326. }
  327. _, err = obj.Update(ctx, objectAttrsToUpdate)
  328. return err
  329. }
  330. // Truncate changes the size of the named file.
  331. // Truncate by path is not supported, while truncating an opened
  332. // file is handled inside base transfer
  333. func (*GCSFs) Truncate(_ string, _ int64) error {
  334. return ErrVfsUnsupported
  335. }
  336. // ReadDir reads the directory named by dirname and returns
  337. // a list of directory entries.
  338. func (fs *GCSFs) ReadDir(dirname string) (DirLister, error) {
  339. // dirname must be already cleaned
  340. prefix := fs.getPrefix(dirname)
  341. query := &storage.Query{Prefix: prefix, Delimiter: "/"}
  342. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  343. if err != nil {
  344. return nil, err
  345. }
  346. bkt := fs.svc.Bucket(fs.config.Bucket)
  347. return &gcsDirLister{
  348. bucket: bkt,
  349. query: query,
  350. timeout: fs.ctxTimeout,
  351. prefix: prefix,
  352. prefixes: make(map[string]bool),
  353. }, nil
  354. }
  355. // IsUploadResumeSupported returns true if resuming uploads is supported.
  356. // Resuming uploads is not supported on GCS
  357. func (*GCSFs) IsUploadResumeSupported() bool {
  358. return false
  359. }
  360. // IsConditionalUploadResumeSupported returns if resuming uploads is supported
  361. // for the specified size
  362. func (*GCSFs) IsConditionalUploadResumeSupported(_ int64) bool {
  363. return true
  364. }
  365. // IsAtomicUploadSupported returns true if atomic upload is supported.
  366. // S3 uploads are already atomic, we don't need to upload to a temporary
  367. // file
  368. func (*GCSFs) IsAtomicUploadSupported() bool {
  369. return false
  370. }
  371. // IsNotExist returns a boolean indicating whether the error is known to
  372. // report that a file or directory does not exist
  373. func (*GCSFs) IsNotExist(err error) bool {
  374. if err == nil {
  375. return false
  376. }
  377. if err == storage.ErrObjectNotExist || err == storage.ErrBucketNotExist {
  378. return true
  379. }
  380. if e, ok := err.(*googleapi.Error); ok {
  381. if e.Code == http.StatusNotFound {
  382. return true
  383. }
  384. }
  385. return false
  386. }
  387. // IsPermission returns a boolean indicating whether the error is known to
  388. // report that permission is denied.
  389. func (*GCSFs) IsPermission(err error) bool {
  390. if err == nil {
  391. return false
  392. }
  393. if e, ok := err.(*googleapi.Error); ok {
  394. if e.Code == http.StatusForbidden || e.Code == http.StatusUnauthorized {
  395. return true
  396. }
  397. }
  398. return false
  399. }
  400. // IsNotSupported returns true if the error indicate an unsupported operation
  401. func (*GCSFs) IsNotSupported(err error) bool {
  402. if err == nil {
  403. return false
  404. }
  405. return errors.Is(err, ErrVfsUnsupported)
  406. }
  407. // CheckRootPath creates the specified local root directory if it does not exists
  408. func (fs *GCSFs) CheckRootPath(username string, uid int, gid int) bool {
  409. // we need a local directory for temporary files
  410. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  411. return osFs.CheckRootPath(username, uid, gid)
  412. }
  413. // ScanRootDirContents returns the number of files contained in the bucket,
  414. // and their size
  415. func (fs *GCSFs) ScanRootDirContents() (int, int64, error) {
  416. return fs.GetDirSize(fs.config.KeyPrefix)
  417. }
  418. // GetDirSize returns the number of files and the size for a folder
  419. // including any subfolders
  420. func (fs *GCSFs) GetDirSize(dirname string) (int, int64, error) {
  421. prefix := fs.getPrefix(dirname)
  422. numFiles := 0
  423. size := int64(0)
  424. query := &storage.Query{Prefix: prefix}
  425. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  426. if err != nil {
  427. return numFiles, size, err
  428. }
  429. iteratePage := func(nextPageToken string) (string, error) {
  430. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  431. defer cancelFn()
  432. bkt := fs.svc.Bucket(fs.config.Bucket)
  433. it := bkt.Objects(ctx, query)
  434. pager := iterator.NewPager(it, defaultGCSPageSize, nextPageToken)
  435. var objects []*storage.ObjectAttrs
  436. pageToken, err := pager.NextPage(&objects)
  437. if err != nil {
  438. return pageToken, err
  439. }
  440. for _, attrs := range objects {
  441. if !attrs.Deleted.IsZero() {
  442. continue
  443. }
  444. isDir := strings.HasSuffix(attrs.Name, "/") || attrs.ContentType == dirMimeType
  445. if isDir && attrs.Size == 0 {
  446. continue
  447. }
  448. numFiles++
  449. size += attrs.Size
  450. }
  451. return pageToken, nil
  452. }
  453. pageToken := ""
  454. for {
  455. pageToken, err = iteratePage(pageToken)
  456. if err != nil {
  457. metric.GCSListObjectsCompleted(err)
  458. return numFiles, size, err
  459. }
  460. fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size)
  461. if pageToken == "" {
  462. break
  463. }
  464. }
  465. metric.GCSListObjectsCompleted(nil)
  466. return numFiles, size, err
  467. }
  468. // GetAtomicUploadPath returns the path to use for an atomic upload.
  469. // GCS uploads are already atomic, we never call this method for GCS
  470. func (*GCSFs) GetAtomicUploadPath(_ string) string {
  471. return ""
  472. }
  473. // GetRelativePath returns the path for a file relative to the user's home dir.
  474. // This is the path as seen by SFTPGo users
  475. func (fs *GCSFs) GetRelativePath(name string) string {
  476. rel := path.Clean(name)
  477. if rel == "." {
  478. rel = ""
  479. }
  480. if !path.IsAbs(rel) {
  481. rel = "/" + rel
  482. }
  483. if fs.config.KeyPrefix != "" {
  484. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  485. rel = "/"
  486. }
  487. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  488. }
  489. if fs.mountPath != "" {
  490. rel = path.Join(fs.mountPath, rel)
  491. }
  492. return rel
  493. }
  494. // Walk walks the file tree rooted at root, calling walkFn for each file or
  495. // directory in the tree, including root
  496. func (fs *GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
  497. prefix := fs.getPrefix(root)
  498. query := &storage.Query{Prefix: prefix}
  499. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  500. if err != nil {
  501. walkFn(root, nil, err) //nolint:errcheck
  502. return err
  503. }
  504. iteratePage := func(nextPageToken string) (string, error) {
  505. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  506. defer cancelFn()
  507. bkt := fs.svc.Bucket(fs.config.Bucket)
  508. it := bkt.Objects(ctx, query)
  509. pager := iterator.NewPager(it, defaultGCSPageSize, nextPageToken)
  510. var objects []*storage.ObjectAttrs
  511. pageToken, err := pager.NextPage(&objects)
  512. if err != nil {
  513. walkFn(root, nil, err) //nolint:errcheck
  514. return pageToken, err
  515. }
  516. for _, attrs := range objects {
  517. if !attrs.Deleted.IsZero() {
  518. continue
  519. }
  520. name, isDir := fs.resolve(attrs.Name, prefix, attrs.ContentType)
  521. if name == "" {
  522. continue
  523. }
  524. objectModTime := attrs.Updated
  525. if val := getLastModified(attrs.Metadata); val > 0 {
  526. objectModTime = util.GetTimeFromMsecSinceEpoch(val)
  527. }
  528. err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, objectModTime, false), nil)
  529. if err != nil {
  530. return pageToken, err
  531. }
  532. }
  533. return pageToken, nil
  534. }
  535. pageToken := ""
  536. for {
  537. pageToken, err = iteratePage(pageToken)
  538. if err != nil {
  539. metric.GCSListObjectsCompleted(err)
  540. return err
  541. }
  542. if pageToken == "" {
  543. break
  544. }
  545. }
  546. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  547. metric.GCSListObjectsCompleted(err)
  548. return err
  549. }
  550. // Join joins any number of path elements into a single path
  551. func (*GCSFs) Join(elem ...string) string {
  552. return strings.TrimPrefix(path.Join(elem...), "/")
  553. }
  554. // HasVirtualFolders returns true if folders are emulated
  555. func (GCSFs) HasVirtualFolders() bool {
  556. return true
  557. }
  558. // ResolvePath returns the matching filesystem path for the specified virtual path
  559. func (fs *GCSFs) ResolvePath(virtualPath string) (string, error) {
  560. if fs.mountPath != "" {
  561. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  562. }
  563. if !path.IsAbs(virtualPath) {
  564. virtualPath = path.Clean("/" + virtualPath)
  565. }
  566. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  567. }
  568. // CopyFile implements the FsFileCopier interface
  569. func (fs *GCSFs) CopyFile(source, target string, srcInfo os.FileInfo) (int, int64, error) {
  570. numFiles := 1
  571. sizeDiff := srcInfo.Size()
  572. var conditions *storage.Conditions
  573. attrs, err := fs.headObject(target)
  574. if err == nil {
  575. sizeDiff -= attrs.Size
  576. numFiles = 0
  577. conditions = &storage.Conditions{GenerationMatch: attrs.Generation}
  578. } else {
  579. if !fs.IsNotExist(err) {
  580. return 0, 0, err
  581. }
  582. conditions = &storage.Conditions{DoesNotExist: true}
  583. }
  584. if err := fs.copyFileInternal(source, target, conditions, srcInfo, true); err != nil {
  585. return 0, 0, err
  586. }
  587. return numFiles, sizeDiff, nil
  588. }
  589. func (fs *GCSFs) resolve(name, prefix, contentType string) (string, bool) {
  590. result := strings.TrimPrefix(name, prefix)
  591. isDir := strings.HasSuffix(result, "/")
  592. if isDir {
  593. result = strings.TrimSuffix(result, "/")
  594. }
  595. if contentType == dirMimeType {
  596. isDir = true
  597. }
  598. return result, isDir
  599. }
  600. // getObjectStat returns the stat result
  601. func (fs *GCSFs) getObjectStat(name string) (os.FileInfo, error) {
  602. attrs, err := fs.headObject(name)
  603. if err == nil {
  604. objSize := attrs.Size
  605. objectModTime := attrs.Updated
  606. if val := getLastModified(attrs.Metadata); val > 0 {
  607. objectModTime = util.GetTimeFromMsecSinceEpoch(val)
  608. }
  609. isDir := attrs.ContentType == dirMimeType || strings.HasSuffix(attrs.Name, "/")
  610. info := NewFileInfo(name, isDir, objSize, objectModTime, false)
  611. if !isDir {
  612. info.setMetadata(attrs.Metadata)
  613. }
  614. return info, nil
  615. }
  616. if !fs.IsNotExist(err) {
  617. return nil, err
  618. }
  619. // now check if this is a prefix (virtual directory)
  620. hasContents, err := fs.hasContents(name)
  621. if err != nil {
  622. return nil, err
  623. }
  624. if hasContents {
  625. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  626. }
  627. // finally check if this is an object with a trailing /
  628. attrs, err = fs.headObject(name + "/")
  629. if err != nil {
  630. return nil, err
  631. }
  632. objectModTime := attrs.Updated
  633. if val := getLastModified(attrs.Metadata); val > 0 {
  634. objectModTime = util.GetTimeFromMsecSinceEpoch(val)
  635. }
  636. return NewFileInfo(name, true, attrs.Size, objectModTime, false), nil
  637. }
  638. func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, flag int, name string) {
  639. var contentType string
  640. if flag == -1 {
  641. contentType = dirMimeType
  642. } else {
  643. contentType = mime.TypeByExtension(path.Ext(name))
  644. }
  645. if contentType != "" {
  646. objectWriter.ObjectAttrs.ContentType = contentType
  647. }
  648. if fs.config.StorageClass != "" {
  649. objectWriter.ObjectAttrs.StorageClass = fs.config.StorageClass
  650. }
  651. if fs.config.ACL != "" {
  652. objectWriter.PredefinedACL = fs.config.ACL
  653. }
  654. }
  655. func (fs *GCSFs) composeObjects(ctx context.Context, dst, partialObject *storage.ObjectHandle) error {
  656. fsLog(fs, logger.LevelDebug, "start object compose for partial file %q, destination %q",
  657. partialObject.ObjectName(), dst.ObjectName())
  658. composer := dst.ComposerFrom(dst, partialObject)
  659. if fs.config.StorageClass != "" {
  660. composer.StorageClass = fs.config.StorageClass
  661. }
  662. if fs.config.ACL != "" {
  663. composer.PredefinedACL = fs.config.ACL
  664. }
  665. contentType := mime.TypeByExtension(path.Ext(dst.ObjectName()))
  666. if contentType != "" {
  667. composer.ContentType = contentType
  668. }
  669. _, err := composer.Run(ctx)
  670. fsLog(fs, logger.LevelDebug, "object compose for %q finished, err: %v", dst.ObjectName(), err)
  671. delCtx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  672. defer cancelFn()
  673. errDelete := partialObject.Delete(delCtx)
  674. metric.GCSDeleteObjectCompleted(errDelete)
  675. fsLog(fs, logger.LevelDebug, "deleted partial file %q after composing with %q, err: %v",
  676. partialObject.ObjectName(), dst.ObjectName(), errDelete)
  677. return err
  678. }
  679. func (fs *GCSFs) copyFileInternal(source, target string, conditions *storage.Conditions,
  680. srcInfo os.FileInfo, updateModTime bool,
  681. ) error {
  682. src := fs.svc.Bucket(fs.config.Bucket).Object(source)
  683. dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
  684. if conditions != nil {
  685. dst = dst.If(*conditions)
  686. } else {
  687. attrs, err := fs.headObject(target)
  688. if err == nil {
  689. dst = dst.If(storage.Conditions{GenerationMatch: attrs.Generation})
  690. } else if fs.IsNotExist(err) {
  691. dst = dst.If(storage.Conditions{DoesNotExist: true})
  692. } else {
  693. fsLog(fs, logger.LevelWarn, "unable to set precondition for copy, target %q, stat err: %v",
  694. target, err)
  695. }
  696. }
  697. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  698. defer cancelFn()
  699. copier := dst.CopierFrom(src)
  700. if fs.config.StorageClass != "" {
  701. copier.StorageClass = fs.config.StorageClass
  702. }
  703. if fs.config.ACL != "" {
  704. copier.PredefinedACL = fs.config.ACL
  705. }
  706. contentType := mime.TypeByExtension(path.Ext(source))
  707. if contentType != "" {
  708. copier.ContentType = contentType
  709. }
  710. metadata := getMetadata(srcInfo)
  711. if updateModTime && len(metadata) > 0 {
  712. delete(metadata, lastModifiedField)
  713. }
  714. if len(metadata) > 0 {
  715. copier.Metadata = metadata
  716. }
  717. _, err := copier.Run(ctx)
  718. metric.GCSCopyObjectCompleted(err)
  719. return err
  720. }
  721. func (fs *GCSFs) renameInternal(source, target string, srcInfo os.FileInfo, recursion int,
  722. updateModTime bool,
  723. ) (int, int64, error) {
  724. var numFiles int
  725. var filesSize int64
  726. if srcInfo.IsDir() {
  727. if renameMode == 0 {
  728. hasContents, err := fs.hasContents(source)
  729. if err != nil {
  730. return numFiles, filesSize, err
  731. }
  732. if hasContents {
  733. return numFiles, filesSize, fmt.Errorf("%w: cannot rename non empty directory: %q", ErrVfsUnsupported, source)
  734. }
  735. }
  736. if err := fs.mkdirInternal(target); err != nil {
  737. return numFiles, filesSize, err
  738. }
  739. if renameMode == 1 {
  740. files, size, err := doRecursiveRename(fs, source, target, fs.renameInternal, recursion, updateModTime)
  741. numFiles += files
  742. filesSize += size
  743. if err != nil {
  744. return numFiles, filesSize, err
  745. }
  746. }
  747. } else {
  748. if err := fs.copyFileInternal(source, target, nil, srcInfo, updateModTime); err != nil {
  749. return numFiles, filesSize, err
  750. }
  751. numFiles++
  752. filesSize += srcInfo.Size()
  753. }
  754. err := fs.Remove(source, srcInfo.IsDir())
  755. if fs.IsNotExist(err) {
  756. err = nil
  757. }
  758. return numFiles, filesSize, err
  759. }
  760. func (fs *GCSFs) mkdirInternal(name string) error {
  761. if !strings.HasSuffix(name, "/") {
  762. name += "/"
  763. }
  764. _, w, _, err := fs.Create(name, -1, 0)
  765. if err != nil {
  766. return err
  767. }
  768. return w.Close()
  769. }
  770. func (fs *GCSFs) hasContents(name string) (bool, error) {
  771. result := false
  772. prefix := fs.getPrefix(name)
  773. query := &storage.Query{Prefix: prefix}
  774. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  775. if err != nil {
  776. return result, err
  777. }
  778. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  779. defer cancelFn()
  780. bkt := fs.svc.Bucket(fs.config.Bucket)
  781. it := bkt.Objects(ctx, query)
  782. // if we have a dir object with a trailing slash it will be returned so we set the size to 2
  783. pager := iterator.NewPager(it, 2, "")
  784. var objects []*storage.ObjectAttrs
  785. _, err = pager.NextPage(&objects)
  786. if err != nil {
  787. metric.GCSListObjectsCompleted(err)
  788. return result, err
  789. }
  790. for _, attrs := range objects {
  791. name, _ := fs.resolve(attrs.Name, prefix, attrs.ContentType)
  792. // a dir object with a trailing slash will result in an empty name
  793. if name == "/" || name == "" {
  794. continue
  795. }
  796. result = true
  797. break
  798. }
  799. metric.GCSListObjectsCompleted(nil)
  800. return result, nil
  801. }
  802. func (fs *GCSFs) getPrefix(name string) string {
  803. prefix := ""
  804. if name != "" && name != "." && name != "/" {
  805. prefix = strings.TrimPrefix(name, "/")
  806. if !strings.HasSuffix(prefix, "/") {
  807. prefix += "/"
  808. }
  809. }
  810. return prefix
  811. }
  812. func (fs *GCSFs) headObject(name string) (*storage.ObjectAttrs, error) {
  813. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  814. defer cancelFn()
  815. bkt := fs.svc.Bucket(fs.config.Bucket)
  816. obj := bkt.Object(name)
  817. attrs, err := obj.Attrs(ctx)
  818. metric.GCSHeadObjectCompleted(err)
  819. return attrs, err
  820. }
  821. // GetMimeType returns the content type
  822. func (fs *GCSFs) GetMimeType(name string) (string, error) {
  823. attrs, err := fs.headObject(name)
  824. if err != nil {
  825. return "", err
  826. }
  827. return attrs.ContentType, nil
  828. }
  829. // Close closes the fs
  830. func (fs *GCSFs) Close() error {
  831. return nil
  832. }
  833. // GetAvailableDiskSize returns the available size for the specified path
  834. func (*GCSFs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  835. return nil, ErrStorageSizeUnavailable
  836. }
  837. func (*GCSFs) getTempObject(name string) string {
  838. dir := filepath.Dir(name)
  839. guid := xid.New().String()
  840. return filepath.Join(dir, ".sftpgo-partial."+guid+"."+filepath.Base(name))
  841. }
  842. type gcsDirLister struct {
  843. baseDirLister
  844. bucket *storage.BucketHandle
  845. query *storage.Query
  846. timeout time.Duration
  847. nextPageToken string
  848. noMorePages bool
  849. prefix string
  850. prefixes map[string]bool
  851. metricUpdated bool
  852. }
  853. func (l *gcsDirLister) resolve(name, contentType string) (string, bool) {
  854. result := strings.TrimPrefix(name, l.prefix)
  855. isDir := strings.HasSuffix(result, "/")
  856. if isDir {
  857. result = strings.TrimSuffix(result, "/")
  858. }
  859. if contentType == dirMimeType {
  860. isDir = true
  861. }
  862. return result, isDir
  863. }
  864. func (l *gcsDirLister) Next(limit int) ([]os.FileInfo, error) {
  865. if limit <= 0 {
  866. return nil, errInvalidDirListerLimit
  867. }
  868. if len(l.cache) >= limit {
  869. return l.returnFromCache(limit), nil
  870. }
  871. if l.noMorePages {
  872. if !l.metricUpdated {
  873. l.metricUpdated = true
  874. metric.GCSListObjectsCompleted(nil)
  875. }
  876. return l.returnFromCache(limit), io.EOF
  877. }
  878. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(l.timeout))
  879. defer cancelFn()
  880. it := l.bucket.Objects(ctx, l.query)
  881. paginator := iterator.NewPager(it, defaultGCSPageSize, l.nextPageToken)
  882. var objects []*storage.ObjectAttrs
  883. pageToken, err := paginator.NextPage(&objects)
  884. if err != nil {
  885. metric.GCSListObjectsCompleted(err)
  886. return l.cache, err
  887. }
  888. for _, attrs := range objects {
  889. if attrs.Prefix != "" {
  890. name, _ := l.resolve(attrs.Prefix, attrs.ContentType)
  891. if name == "" {
  892. continue
  893. }
  894. if _, ok := l.prefixes[name]; ok {
  895. continue
  896. }
  897. l.cache = append(l.cache, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  898. l.prefixes[name] = true
  899. } else {
  900. name, isDir := l.resolve(attrs.Name, attrs.ContentType)
  901. if name == "" {
  902. continue
  903. }
  904. if !attrs.Deleted.IsZero() {
  905. continue
  906. }
  907. if isDir {
  908. // check if the dir is already included, it will be sent as blob prefix if it contains at least one item
  909. if _, ok := l.prefixes[name]; ok {
  910. continue
  911. }
  912. l.prefixes[name] = true
  913. }
  914. modTime := attrs.Updated
  915. if val := getLastModified(attrs.Metadata); val > 0 {
  916. modTime = util.GetTimeFromMsecSinceEpoch(val)
  917. }
  918. info := NewFileInfo(name, isDir, attrs.Size, modTime, false)
  919. info.setMetadata(attrs.Metadata)
  920. l.cache = append(l.cache, info)
  921. }
  922. }
  923. l.nextPageToken = pageToken
  924. l.noMorePages = (l.nextPageToken == "")
  925. return l.returnFromCache(limit), nil
  926. }
  927. func (l *gcsDirLister) Close() error {
  928. clear(l.prefixes)
  929. return l.baseDirLister.Close()
  930. }