vfs.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package vfs provides local and remote filesystems support
  15. package vfs
  16. import (
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/url"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "runtime"
  25. "slices"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "github.com/eikenb/pipeat"
  31. "github.com/pkg/sftp"
  32. "github.com/sftpgo/sdk"
  33. "github.com/drakkan/sftpgo/v2/internal/kms"
  34. "github.com/drakkan/sftpgo/v2/internal/logger"
  35. "github.com/drakkan/sftpgo/v2/internal/util"
  36. )
  37. const (
  38. dirMimeType = "inode/directory"
  39. s3fsName = "S3Fs"
  40. gcsfsName = "GCSFs"
  41. azBlobFsName = "AzureBlobFs"
  42. lastModifiedField = "sftpgo_last_modified"
  43. preResumeTimeout = 90 * time.Second
  44. // ListerBatchSize defines the default limit for DirLister implementations
  45. ListerBatchSize = 1000
  46. )
  47. // Additional checks for files
  48. const (
  49. CheckParentDir = 1
  50. CheckResume = 2
  51. )
  52. var (
  53. validAzAccessTier = []string{"", "Archive", "Hot", "Cool"}
  54. // ErrStorageSizeUnavailable is returned if the storage backend does not support getting the size
  55. ErrStorageSizeUnavailable = errors.New("unable to get available size for this storage backend")
  56. // ErrVfsUnsupported defines the error for an unsupported VFS operation
  57. ErrVfsUnsupported = errors.New("not supported")
  58. errInvalidDirListerLimit = errors.New("dir lister: invalid limit, must be > 0")
  59. tempPath string
  60. sftpFingerprints []string
  61. allowSelfConnections int
  62. renameMode int
  63. readMetadata int
  64. resumeMaxSize int64
  65. uploadMode int
  66. )
  67. // SetAllowSelfConnections sets the desired behaviour for self connections
  68. func SetAllowSelfConnections(value int) {
  69. allowSelfConnections = value
  70. }
  71. // SetTempPath sets the path for temporary files
  72. func SetTempPath(fsPath string) {
  73. tempPath = fsPath
  74. }
  75. // GetTempPath returns the path for temporary files
  76. func GetTempPath() string {
  77. return tempPath
  78. }
  79. // SetSFTPFingerprints sets the SFTP host key fingerprints
  80. func SetSFTPFingerprints(fp []string) {
  81. sftpFingerprints = fp
  82. }
  83. // SetRenameMode sets the rename mode
  84. func SetRenameMode(val int) {
  85. renameMode = val
  86. }
  87. // SetReadMetadataMode sets the read metadata mode
  88. func SetReadMetadataMode(val int) {
  89. readMetadata = val
  90. }
  91. // SetResumeMaxSize sets the max size allowed for resuming uploads for backends
  92. // with immutable objects
  93. func SetResumeMaxSize(val int64) {
  94. resumeMaxSize = val
  95. }
  96. // SetUploadMode sets the upload mode
  97. func SetUploadMode(val int) {
  98. uploadMode = val
  99. }
  100. // Fs defines the interface for filesystem backends
  101. type Fs interface {
  102. Name() string
  103. ConnectionID() string
  104. Stat(name string) (os.FileInfo, error)
  105. Lstat(name string) (os.FileInfo, error)
  106. Open(name string, offset int64) (File, PipeReader, func(), error)
  107. Create(name string, flag, checks int) (File, PipeWriter, func(), error)
  108. Rename(source, target string) (int, int64, error)
  109. Remove(name string, isDir bool) error
  110. Mkdir(name string) error
  111. Symlink(source, target string) error
  112. Chown(name string, uid int, gid int) error
  113. Chmod(name string, mode os.FileMode) error
  114. Chtimes(name string, atime, mtime time.Time, isUploading bool) error
  115. Truncate(name string, size int64) error
  116. ReadDir(dirname string) (DirLister, error)
  117. Readlink(name string) (string, error)
  118. IsUploadResumeSupported() bool
  119. IsConditionalUploadResumeSupported(size int64) bool
  120. IsAtomicUploadSupported() bool
  121. CheckRootPath(username string, uid int, gid int) bool
  122. ResolvePath(virtualPath string) (string, error)
  123. IsNotExist(err error) bool
  124. IsPermission(err error) bool
  125. IsNotSupported(err error) bool
  126. ScanRootDirContents() (int, int64, error)
  127. GetDirSize(dirname string) (int, int64, error)
  128. GetAtomicUploadPath(name string) string
  129. GetRelativePath(name string) string
  130. Walk(root string, walkFn filepath.WalkFunc) error
  131. Join(elem ...string) string
  132. HasVirtualFolders() bool
  133. GetMimeType(name string) (string, error)
  134. GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error)
  135. Close() error
  136. }
  137. // FsRealPather is a Fs that implements the RealPath method.
  138. type FsRealPather interface {
  139. Fs
  140. RealPath(p string) (string, error)
  141. }
  142. // FsFileCopier is a Fs that implements the CopyFile method.
  143. type FsFileCopier interface {
  144. Fs
  145. CopyFile(source, target string, srcSize int64) (int, int64, error)
  146. }
  147. // File defines an interface representing a SFTPGo file
  148. type File interface {
  149. io.Reader
  150. io.Writer
  151. io.Closer
  152. io.ReaderAt
  153. io.WriterAt
  154. io.Seeker
  155. Stat() (os.FileInfo, error)
  156. Name() string
  157. Truncate(size int64) error
  158. }
  159. // PipeWriter defines an interface representing a SFTPGo pipe writer
  160. type PipeWriter interface {
  161. io.Writer
  162. io.WriterAt
  163. io.Closer
  164. Done(err error)
  165. GetWrittenBytes() int64
  166. }
  167. // PipeReader defines an interface representing a SFTPGo pipe reader
  168. type PipeReader interface {
  169. io.Reader
  170. io.ReaderAt
  171. io.Closer
  172. setMetadata(value map[string]string)
  173. setMetadataFromPointerVal(value map[string]*string)
  174. Metadata() map[string]string
  175. }
  176. // DirLister defines an interface for a directory lister
  177. type DirLister interface {
  178. Next(limit int) ([]os.FileInfo, error)
  179. Close() error
  180. }
  181. // Metadater defines an interface to implement to return metadata for a file
  182. type Metadater interface {
  183. Metadata() map[string]string
  184. }
  185. type baseDirLister struct {
  186. cache []os.FileInfo
  187. }
  188. func (l *baseDirLister) Next(limit int) ([]os.FileInfo, error) {
  189. if limit <= 0 {
  190. return nil, errInvalidDirListerLimit
  191. }
  192. if len(l.cache) >= limit {
  193. return l.returnFromCache(limit), nil
  194. }
  195. return l.returnFromCache(limit), io.EOF
  196. }
  197. func (l *baseDirLister) returnFromCache(limit int) []os.FileInfo {
  198. if len(l.cache) >= limit {
  199. result := l.cache[:limit]
  200. l.cache = l.cache[limit:]
  201. return result
  202. }
  203. result := l.cache
  204. l.cache = nil
  205. return result
  206. }
  207. func (l *baseDirLister) Close() error {
  208. l.cache = nil
  209. return nil
  210. }
  211. // QuotaCheckResult defines the result for a quota check
  212. type QuotaCheckResult struct {
  213. HasSpace bool
  214. AllowedSize int64
  215. AllowedFiles int
  216. UsedSize int64
  217. UsedFiles int
  218. QuotaSize int64
  219. QuotaFiles int
  220. }
  221. // GetRemainingSize returns the remaining allowed size
  222. func (q *QuotaCheckResult) GetRemainingSize() int64 {
  223. if q.QuotaSize > 0 {
  224. return q.QuotaSize - q.UsedSize
  225. }
  226. return 0
  227. }
  228. // GetRemainingFiles returns the remaining allowed files
  229. func (q *QuotaCheckResult) GetRemainingFiles() int {
  230. if q.QuotaFiles > 0 {
  231. return q.QuotaFiles - q.UsedFiles
  232. }
  233. return 0
  234. }
  235. // S3FsConfig defines the configuration for S3 based filesystem
  236. type S3FsConfig struct {
  237. sdk.BaseS3FsConfig
  238. AccessSecret *kms.Secret `json:"access_secret,omitempty"`
  239. }
  240. // HideConfidentialData hides confidential data
  241. func (c *S3FsConfig) HideConfidentialData() {
  242. if c.AccessSecret != nil {
  243. c.AccessSecret.Hide()
  244. }
  245. }
  246. func (c *S3FsConfig) isEqual(other S3FsConfig) bool {
  247. if c.Bucket != other.Bucket {
  248. return false
  249. }
  250. if c.KeyPrefix != other.KeyPrefix {
  251. return false
  252. }
  253. if c.Region != other.Region {
  254. return false
  255. }
  256. if c.AccessKey != other.AccessKey {
  257. return false
  258. }
  259. if c.RoleARN != other.RoleARN {
  260. return false
  261. }
  262. if c.Endpoint != other.Endpoint {
  263. return false
  264. }
  265. if c.StorageClass != other.StorageClass {
  266. return false
  267. }
  268. if c.ACL != other.ACL {
  269. return false
  270. }
  271. if !c.areMultipartFieldsEqual(other) {
  272. return false
  273. }
  274. if c.ForcePathStyle != other.ForcePathStyle {
  275. return false
  276. }
  277. if c.SkipTLSVerify != other.SkipTLSVerify {
  278. return false
  279. }
  280. return c.isSecretEqual(other)
  281. }
  282. func (c *S3FsConfig) areMultipartFieldsEqual(other S3FsConfig) bool {
  283. if c.UploadPartSize != other.UploadPartSize {
  284. return false
  285. }
  286. if c.UploadConcurrency != other.UploadConcurrency {
  287. return false
  288. }
  289. if c.DownloadConcurrency != other.DownloadConcurrency {
  290. return false
  291. }
  292. if c.DownloadPartSize != other.DownloadPartSize {
  293. return false
  294. }
  295. if c.DownloadPartMaxTime != other.DownloadPartMaxTime {
  296. return false
  297. }
  298. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  299. return false
  300. }
  301. return true
  302. }
  303. func (c *S3FsConfig) isSecretEqual(other S3FsConfig) bool {
  304. if c.AccessSecret == nil {
  305. c.AccessSecret = kms.NewEmptySecret()
  306. }
  307. if other.AccessSecret == nil {
  308. other.AccessSecret = kms.NewEmptySecret()
  309. }
  310. return c.AccessSecret.IsEqual(other.AccessSecret)
  311. }
  312. func (c *S3FsConfig) checkCredentials() error {
  313. if c.AccessKey == "" && !c.AccessSecret.IsEmpty() {
  314. return util.NewI18nError(
  315. errors.New("access_key cannot be empty with access_secret not empty"),
  316. util.I18nErrorAccessKeyRequired,
  317. )
  318. }
  319. if c.AccessSecret.IsEmpty() && c.AccessKey != "" {
  320. return util.NewI18nError(
  321. errors.New("access_secret cannot be empty with access_key not empty"),
  322. util.I18nErrorAccessSecretRequired,
  323. )
  324. }
  325. if c.AccessSecret.IsEncrypted() && !c.AccessSecret.IsValid() {
  326. return errors.New("invalid encrypted access_secret")
  327. }
  328. if !c.AccessSecret.IsEmpty() && !c.AccessSecret.IsValidInput() {
  329. return errors.New("invalid access_secret")
  330. }
  331. return nil
  332. }
  333. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  334. func (c *S3FsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  335. if err := c.validate(); err != nil {
  336. var errI18n *util.I18nError
  337. errValidation := util.NewValidationError(fmt.Sprintf("could not validate s3config: %v", err))
  338. if errors.As(err, &errI18n) {
  339. return util.NewI18nError(errValidation, errI18n.Message)
  340. }
  341. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  342. }
  343. if c.AccessSecret.IsPlain() {
  344. c.AccessSecret.SetAdditionalData(additionalData)
  345. err := c.AccessSecret.Encrypt()
  346. if err != nil {
  347. return util.NewI18nError(
  348. util.NewValidationError(fmt.Sprintf("could not encrypt s3 access secret: %v", err)),
  349. util.I18nErrorFsValidation,
  350. )
  351. }
  352. }
  353. return nil
  354. }
  355. func (c *S3FsConfig) checkPartSizeAndConcurrency() error {
  356. if c.UploadPartSize != 0 && (c.UploadPartSize < 5 || c.UploadPartSize > 5000) {
  357. return util.NewI18nError(
  358. errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)"),
  359. util.I18nErrorULPartSizeInvalid,
  360. )
  361. }
  362. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  363. return util.NewI18nError(
  364. fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency),
  365. util.I18nErrorULConcurrencyInvalid,
  366. )
  367. }
  368. if c.DownloadPartSize != 0 && (c.DownloadPartSize < 5 || c.DownloadPartSize > 5000) {
  369. return util.NewI18nError(
  370. errors.New("download_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)"),
  371. util.I18nErrorDLPartSizeInvalid,
  372. )
  373. }
  374. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  375. return util.NewI18nError(
  376. fmt.Errorf("invalid download concurrency: %v", c.DownloadConcurrency),
  377. util.I18nErrorDLConcurrencyInvalid,
  378. )
  379. }
  380. return nil
  381. }
  382. func (c *S3FsConfig) isSameResource(other S3FsConfig) bool {
  383. if c.Bucket != other.Bucket {
  384. return false
  385. }
  386. if c.Endpoint != other.Endpoint {
  387. return false
  388. }
  389. return c.Region == other.Region
  390. }
  391. // validate returns an error if the configuration is not valid
  392. func (c *S3FsConfig) validate() error {
  393. if c.AccessSecret == nil {
  394. c.AccessSecret = kms.NewEmptySecret()
  395. }
  396. if c.Bucket == "" {
  397. return util.NewI18nError(errors.New("bucket cannot be empty"), util.I18nErrorBucketRequired)
  398. }
  399. // the region may be embedded within the endpoint for some S3 compatible
  400. // object storage, for example B2
  401. if c.Endpoint == "" && c.Region == "" {
  402. return util.NewI18nError(errors.New("region cannot be empty"), util.I18nErrorRegionRequired)
  403. }
  404. if err := c.checkCredentials(); err != nil {
  405. return err
  406. }
  407. if c.KeyPrefix != "" {
  408. if strings.HasPrefix(c.KeyPrefix, "/") {
  409. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  410. }
  411. c.KeyPrefix = path.Clean(c.KeyPrefix)
  412. if !strings.HasSuffix(c.KeyPrefix, "/") {
  413. c.KeyPrefix += "/"
  414. }
  415. }
  416. c.StorageClass = strings.TrimSpace(c.StorageClass)
  417. c.ACL = strings.TrimSpace(c.ACL)
  418. return c.checkPartSizeAndConcurrency()
  419. }
  420. // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
  421. type GCSFsConfig struct {
  422. sdk.BaseGCSFsConfig
  423. Credentials *kms.Secret `json:"credentials,omitempty"`
  424. }
  425. // HideConfidentialData hides confidential data
  426. func (c *GCSFsConfig) HideConfidentialData() {
  427. if c.Credentials != nil {
  428. c.Credentials.Hide()
  429. }
  430. }
  431. // ValidateAndEncryptCredentials validates the configuration and encrypts credentials if they are in plain text
  432. func (c *GCSFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  433. if err := c.validate(); err != nil {
  434. var errI18n *util.I18nError
  435. errValidation := util.NewValidationError(fmt.Sprintf("could not validate GCS config: %v", err))
  436. if errors.As(err, &errI18n) {
  437. return util.NewI18nError(errValidation, errI18n.Message)
  438. }
  439. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  440. }
  441. if c.Credentials.IsPlain() {
  442. c.Credentials.SetAdditionalData(additionalData)
  443. err := c.Credentials.Encrypt()
  444. if err != nil {
  445. return util.NewI18nError(
  446. util.NewValidationError(fmt.Sprintf("could not encrypt GCS credentials: %v", err)),
  447. util.I18nErrorFsValidation,
  448. )
  449. }
  450. }
  451. return nil
  452. }
  453. func (c *GCSFsConfig) isEqual(other GCSFsConfig) bool {
  454. if c.Bucket != other.Bucket {
  455. return false
  456. }
  457. if c.KeyPrefix != other.KeyPrefix {
  458. return false
  459. }
  460. if c.AutomaticCredentials != other.AutomaticCredentials {
  461. return false
  462. }
  463. if c.StorageClass != other.StorageClass {
  464. return false
  465. }
  466. if c.ACL != other.ACL {
  467. return false
  468. }
  469. if c.UploadPartSize != other.UploadPartSize {
  470. return false
  471. }
  472. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  473. return false
  474. }
  475. if c.Credentials == nil {
  476. c.Credentials = kms.NewEmptySecret()
  477. }
  478. if other.Credentials == nil {
  479. other.Credentials = kms.NewEmptySecret()
  480. }
  481. return c.Credentials.IsEqual(other.Credentials)
  482. }
  483. func (c *GCSFsConfig) isSameResource(other GCSFsConfig) bool {
  484. return c.Bucket == other.Bucket
  485. }
  486. // validate returns an error if the configuration is not valid
  487. func (c *GCSFsConfig) validate() error {
  488. if c.Credentials == nil || c.AutomaticCredentials == 1 {
  489. c.Credentials = kms.NewEmptySecret()
  490. }
  491. if c.Bucket == "" {
  492. return util.NewI18nError(errors.New("bucket cannot be empty"), util.I18nErrorBucketRequired)
  493. }
  494. if c.KeyPrefix != "" {
  495. if strings.HasPrefix(c.KeyPrefix, "/") {
  496. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  497. }
  498. c.KeyPrefix = path.Clean(c.KeyPrefix)
  499. if !strings.HasSuffix(c.KeyPrefix, "/") {
  500. c.KeyPrefix += "/"
  501. }
  502. }
  503. if c.Credentials.IsEncrypted() && !c.Credentials.IsValid() {
  504. return errors.New("invalid encrypted credentials")
  505. }
  506. if c.AutomaticCredentials == 0 && !c.Credentials.IsValidInput() {
  507. return util.NewI18nError(errors.New("invalid credentials"), util.I18nErrorFsCredentialsRequired)
  508. }
  509. c.StorageClass = strings.TrimSpace(c.StorageClass)
  510. c.ACL = strings.TrimSpace(c.ACL)
  511. if c.UploadPartSize < 0 {
  512. c.UploadPartSize = 0
  513. }
  514. if c.UploadPartMaxTime < 0 {
  515. c.UploadPartMaxTime = 0
  516. }
  517. return nil
  518. }
  519. // AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
  520. type AzBlobFsConfig struct {
  521. sdk.BaseAzBlobFsConfig
  522. // Storage Account Key leave blank to use SAS URL.
  523. // The access key is stored encrypted based on the kms configuration
  524. AccountKey *kms.Secret `json:"account_key,omitempty"`
  525. // Shared access signature URL, leave blank if using account/key
  526. SASURL *kms.Secret `json:"sas_url,omitempty"`
  527. }
  528. // HideConfidentialData hides confidential data
  529. func (c *AzBlobFsConfig) HideConfidentialData() {
  530. if c.AccountKey != nil {
  531. c.AccountKey.Hide()
  532. }
  533. if c.SASURL != nil {
  534. c.SASURL.Hide()
  535. }
  536. }
  537. func (c *AzBlobFsConfig) isEqual(other AzBlobFsConfig) bool {
  538. if c.Container != other.Container {
  539. return false
  540. }
  541. if c.AccountName != other.AccountName {
  542. return false
  543. }
  544. if c.Endpoint != other.Endpoint {
  545. return false
  546. }
  547. if c.SASURL.IsEmpty() {
  548. c.SASURL = kms.NewEmptySecret()
  549. }
  550. if other.SASURL.IsEmpty() {
  551. other.SASURL = kms.NewEmptySecret()
  552. }
  553. if !c.SASURL.IsEqual(other.SASURL) {
  554. return false
  555. }
  556. if c.KeyPrefix != other.KeyPrefix {
  557. return false
  558. }
  559. if c.UploadPartSize != other.UploadPartSize {
  560. return false
  561. }
  562. if c.UploadConcurrency != other.UploadConcurrency {
  563. return false
  564. }
  565. if c.DownloadPartSize != other.DownloadPartSize {
  566. return false
  567. }
  568. if c.DownloadConcurrency != other.DownloadConcurrency {
  569. return false
  570. }
  571. if c.UseEmulator != other.UseEmulator {
  572. return false
  573. }
  574. if c.AccessTier != other.AccessTier {
  575. return false
  576. }
  577. return c.isSecretEqual(other)
  578. }
  579. func (c *AzBlobFsConfig) isSecretEqual(other AzBlobFsConfig) bool {
  580. if c.AccountKey == nil {
  581. c.AccountKey = kms.NewEmptySecret()
  582. }
  583. if other.AccountKey == nil {
  584. other.AccountKey = kms.NewEmptySecret()
  585. }
  586. return c.AccountKey.IsEqual(other.AccountKey)
  587. }
  588. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  589. func (c *AzBlobFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  590. if err := c.validate(); err != nil {
  591. var errI18n *util.I18nError
  592. errValidation := util.NewValidationError(fmt.Sprintf("could not validate Azure Blob config: %v", err))
  593. if errors.As(err, &errI18n) {
  594. return util.NewI18nError(errValidation, errI18n.Message)
  595. }
  596. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  597. }
  598. if c.AccountKey.IsPlain() {
  599. c.AccountKey.SetAdditionalData(additionalData)
  600. if err := c.AccountKey.Encrypt(); err != nil {
  601. return util.NewI18nError(
  602. util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob account key: %v", err)),
  603. util.I18nErrorFsValidation,
  604. )
  605. }
  606. }
  607. if c.SASURL.IsPlain() {
  608. c.SASURL.SetAdditionalData(additionalData)
  609. if err := c.SASURL.Encrypt(); err != nil {
  610. return util.NewI18nError(
  611. util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob SAS URL: %v", err)),
  612. util.I18nErrorFsValidation,
  613. )
  614. }
  615. }
  616. return nil
  617. }
  618. func (c *AzBlobFsConfig) checkCredentials() error {
  619. if c.SASURL.IsPlain() {
  620. _, err := url.Parse(c.SASURL.GetPayload())
  621. if err != nil {
  622. return util.NewI18nError(err, util.I18nErrorSASURLInvalid)
  623. }
  624. return nil
  625. }
  626. if c.SASURL.IsEncrypted() && !c.SASURL.IsValid() {
  627. return errors.New("invalid encrypted sas_url")
  628. }
  629. if !c.SASURL.IsEmpty() {
  630. return nil
  631. }
  632. if c.AccountName == "" || !c.AccountKey.IsValidInput() {
  633. return util.NewI18nError(errors.New("credentials cannot be empty or invalid"), util.I18nErrorAccountNameRequired)
  634. }
  635. if c.AccountKey.IsEncrypted() && !c.AccountKey.IsValid() {
  636. return errors.New("invalid encrypted account_key")
  637. }
  638. return nil
  639. }
  640. func (c *AzBlobFsConfig) checkPartSizeAndConcurrency() error {
  641. if c.UploadPartSize < 0 || c.UploadPartSize > 100 {
  642. return util.NewI18nError(
  643. fmt.Errorf("invalid upload part size: %v", c.UploadPartSize),
  644. util.I18nErrorULPartSizeInvalid,
  645. )
  646. }
  647. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  648. return util.NewI18nError(
  649. fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency),
  650. util.I18nErrorULConcurrencyInvalid,
  651. )
  652. }
  653. if c.DownloadPartSize < 0 || c.DownloadPartSize > 100 {
  654. return util.NewI18nError(
  655. fmt.Errorf("invalid download part size: %v", c.DownloadPartSize),
  656. util.I18nErrorDLPartSizeInvalid,
  657. )
  658. }
  659. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  660. return util.NewI18nError(
  661. fmt.Errorf("invalid upload concurrency: %v", c.DownloadConcurrency),
  662. util.I18nErrorDLConcurrencyInvalid,
  663. )
  664. }
  665. return nil
  666. }
  667. func (c *AzBlobFsConfig) tryDecrypt() error {
  668. if err := c.AccountKey.TryDecrypt(); err != nil {
  669. return fmt.Errorf("unable to decrypt account key: %w", err)
  670. }
  671. if err := c.SASURL.TryDecrypt(); err != nil {
  672. return fmt.Errorf("unable to decrypt SAS URL: %w", err)
  673. }
  674. return nil
  675. }
  676. func (c *AzBlobFsConfig) isSameResource(other AzBlobFsConfig) bool {
  677. if c.AccountName != other.AccountName {
  678. return false
  679. }
  680. if c.Endpoint != other.Endpoint {
  681. return false
  682. }
  683. return c.SASURL.GetPayload() == other.SASURL.GetPayload()
  684. }
  685. // validate returns an error if the configuration is not valid
  686. func (c *AzBlobFsConfig) validate() error {
  687. if c.AccountKey == nil {
  688. c.AccountKey = kms.NewEmptySecret()
  689. }
  690. if c.SASURL == nil {
  691. c.SASURL = kms.NewEmptySecret()
  692. }
  693. // container could be embedded within SAS URL we check this at runtime
  694. if c.SASURL.IsEmpty() && c.Container == "" {
  695. return util.NewI18nError(errors.New("container cannot be empty"), util.I18nErrorContainerRequired)
  696. }
  697. if err := c.checkCredentials(); err != nil {
  698. return err
  699. }
  700. if c.KeyPrefix != "" {
  701. if strings.HasPrefix(c.KeyPrefix, "/") {
  702. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  703. }
  704. c.KeyPrefix = path.Clean(c.KeyPrefix)
  705. if !strings.HasSuffix(c.KeyPrefix, "/") {
  706. c.KeyPrefix += "/"
  707. }
  708. }
  709. if err := c.checkPartSizeAndConcurrency(); err != nil {
  710. return err
  711. }
  712. if !slices.Contains(validAzAccessTier, c.AccessTier) {
  713. return fmt.Errorf("invalid access tier %q, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))
  714. }
  715. return nil
  716. }
  717. // CryptFsConfig defines the configuration to store local files as encrypted
  718. type CryptFsConfig struct {
  719. sdk.OSFsConfig
  720. Passphrase *kms.Secret `json:"passphrase,omitempty"`
  721. }
  722. // HideConfidentialData hides confidential data
  723. func (c *CryptFsConfig) HideConfidentialData() {
  724. if c.Passphrase != nil {
  725. c.Passphrase.Hide()
  726. }
  727. }
  728. func (c *CryptFsConfig) isEqual(other CryptFsConfig) bool {
  729. if c.Passphrase == nil {
  730. c.Passphrase = kms.NewEmptySecret()
  731. }
  732. if other.Passphrase == nil {
  733. other.Passphrase = kms.NewEmptySecret()
  734. }
  735. return c.Passphrase.IsEqual(other.Passphrase)
  736. }
  737. // ValidateAndEncryptCredentials validates the configuration and encrypts the passphrase if it is in plain text
  738. func (c *CryptFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  739. if err := c.validate(); err != nil {
  740. var errI18n *util.I18nError
  741. errValidation := util.NewValidationError(fmt.Sprintf("could not validate crypt fs config: %v", err))
  742. if errors.As(err, &errI18n) {
  743. return util.NewI18nError(errValidation, errI18n.Message)
  744. }
  745. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  746. }
  747. if c.Passphrase.IsPlain() {
  748. c.Passphrase.SetAdditionalData(additionalData)
  749. if err := c.Passphrase.Encrypt(); err != nil {
  750. return util.NewI18nError(
  751. util.NewValidationError(fmt.Sprintf("could not encrypt Crypt fs passphrase: %v", err)),
  752. util.I18nErrorFsValidation,
  753. )
  754. }
  755. }
  756. return nil
  757. }
  758. func (c *CryptFsConfig) isSameResource(other CryptFsConfig) bool {
  759. return c.Passphrase.GetPayload() == other.Passphrase.GetPayload()
  760. }
  761. // validate returns an error if the configuration is not valid
  762. func (c *CryptFsConfig) validate() error {
  763. if c.Passphrase == nil || c.Passphrase.IsEmpty() {
  764. return util.NewI18nError(errors.New("invalid passphrase"), util.I18nErrorPassphraseRequired)
  765. }
  766. if !c.Passphrase.IsValidInput() {
  767. return util.NewI18nError(errors.New("passphrase cannot be empty or invalid"), util.I18nErrorPassphraseRequired)
  768. }
  769. if c.Passphrase.IsEncrypted() && !c.Passphrase.IsValid() {
  770. return errors.New("invalid encrypted passphrase")
  771. }
  772. return nil
  773. }
  774. // pipeWriter defines a wrapper for pipeat.PipeWriterAt.
  775. type pipeWriter struct {
  776. *pipeat.PipeWriterAt
  777. err error
  778. done chan bool
  779. }
  780. // NewPipeWriter initializes a new PipeWriter
  781. func NewPipeWriter(w *pipeat.PipeWriterAt) PipeWriter {
  782. return &pipeWriter{
  783. PipeWriterAt: w,
  784. err: nil,
  785. done: make(chan bool),
  786. }
  787. }
  788. // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
  789. func (p *pipeWriter) Close() error {
  790. p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null
  791. <-p.done
  792. return p.err
  793. }
  794. // Done unlocks other goroutines waiting on Close().
  795. // It must be called when the upload ends
  796. func (p *pipeWriter) Done(err error) {
  797. p.err = err
  798. p.done <- true
  799. }
  800. func newPipeWriterAtOffset(w *pipeat.PipeWriterAt, offset int64) PipeWriter {
  801. return &pipeWriterAtOffset{
  802. pipeWriter: &pipeWriter{
  803. PipeWriterAt: w,
  804. err: nil,
  805. done: make(chan bool),
  806. },
  807. offset: offset,
  808. writeOffset: offset,
  809. }
  810. }
  811. type pipeWriterAtOffset struct {
  812. *pipeWriter
  813. offset int64
  814. writeOffset int64
  815. }
  816. func (p *pipeWriterAtOffset) WriteAt(buf []byte, off int64) (int, error) {
  817. if off < p.offset {
  818. return 0, fmt.Errorf("invalid offset %d, minimum accepted %d", off, p.offset)
  819. }
  820. return p.pipeWriter.WriteAt(buf, off-p.offset)
  821. }
  822. func (p *pipeWriterAtOffset) Write(buf []byte) (int, error) {
  823. n, err := p.WriteAt(buf, p.writeOffset)
  824. p.writeOffset += int64(n)
  825. return n, err
  826. }
  827. // NewPipeReader initializes a new PipeReader
  828. func NewPipeReader(r *pipeat.PipeReaderAt) PipeReader {
  829. return &pipeReader{
  830. PipeReaderAt: r,
  831. }
  832. }
  833. // pipeReader defines a wrapper for pipeat.PipeReaderAt.
  834. type pipeReader struct {
  835. *pipeat.PipeReaderAt
  836. mu sync.RWMutex
  837. metadata map[string]string
  838. }
  839. func (p *pipeReader) setMetadata(value map[string]string) {
  840. p.mu.Lock()
  841. defer p.mu.Unlock()
  842. p.metadata = value
  843. }
  844. func (p *pipeReader) setMetadataFromPointerVal(value map[string]*string) {
  845. p.mu.Lock()
  846. defer p.mu.Unlock()
  847. if len(value) == 0 {
  848. p.metadata = nil
  849. return
  850. }
  851. p.metadata = map[string]string{}
  852. for k, v := range value {
  853. val := util.GetStringFromPointer(v)
  854. if val != "" {
  855. p.metadata[k] = val
  856. }
  857. }
  858. }
  859. // Metadata implements the Metadater interface
  860. func (p *pipeReader) Metadata() map[string]string {
  861. p.mu.RLock()
  862. defer p.mu.RUnlock()
  863. if len(p.metadata) == 0 {
  864. return nil
  865. }
  866. result := make(map[string]string)
  867. for k, v := range p.metadata {
  868. result[k] = v
  869. }
  870. return result
  871. }
  872. func isEqualityCheckModeValid(mode int) bool {
  873. return mode >= 0 || mode <= 1
  874. }
  875. // isDirectory checks if a path exists and is a directory
  876. func isDirectory(fs Fs, path string) (bool, error) {
  877. fileInfo, err := fs.Stat(path)
  878. if err != nil {
  879. return false, err
  880. }
  881. return fileInfo.IsDir(), err
  882. }
  883. // IsLocalOsFs returns true if fs is a local filesystem implementation
  884. func IsLocalOsFs(fs Fs) bool {
  885. return fs.Name() == osFsName
  886. }
  887. // IsCryptOsFs returns true if fs is an encrypted local filesystem implementation
  888. func IsCryptOsFs(fs Fs) bool {
  889. return fs.Name() == cryptFsName
  890. }
  891. // IsSFTPFs returns true if fs is an SFTP filesystem
  892. func IsSFTPFs(fs Fs) bool {
  893. return strings.HasPrefix(fs.Name(), sftpFsName)
  894. }
  895. // IsHTTPFs returns true if fs is an HTTP filesystem
  896. func IsHTTPFs(fs Fs) bool {
  897. return strings.HasPrefix(fs.Name(), httpFsName)
  898. }
  899. // IsBufferedLocalOrSFTPFs returns true if this is a buffered SFTP or local filesystem
  900. func IsBufferedLocalOrSFTPFs(fs Fs) bool {
  901. if osFs, ok := fs.(*OsFs); ok {
  902. return osFs.writeBufferSize > 0
  903. }
  904. if !IsSFTPFs(fs) {
  905. return false
  906. }
  907. return !fs.IsUploadResumeSupported()
  908. }
  909. // FsOpenReturnsFile returns true if fs.Open returns a *os.File handle
  910. func FsOpenReturnsFile(fs Fs) bool {
  911. if osFs, ok := fs.(*OsFs); ok {
  912. return osFs.readBufferSize == 0
  913. }
  914. if sftpFs, ok := fs.(*SFTPFs); ok {
  915. return sftpFs.config.BufferSize == 0
  916. }
  917. return false
  918. }
  919. // IsLocalOrSFTPFs returns true if fs is local or SFTP
  920. func IsLocalOrSFTPFs(fs Fs) bool {
  921. return IsLocalOsFs(fs) || IsSFTPFs(fs)
  922. }
  923. // HasTruncateSupport returns true if the fs supports truncate files
  924. func HasTruncateSupport(fs Fs) bool {
  925. return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
  926. }
  927. // IsRenameAtomic returns true if renaming a directory is supposed to be atomic
  928. func IsRenameAtomic(fs Fs) bool {
  929. if strings.HasPrefix(fs.Name(), s3fsName) {
  930. return false
  931. }
  932. if strings.HasPrefix(fs.Name(), gcsfsName) {
  933. return false
  934. }
  935. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  936. return false
  937. }
  938. return true
  939. }
  940. // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
  941. func HasImplicitAtomicUploads(fs Fs) bool {
  942. if strings.HasPrefix(fs.Name(), s3fsName) {
  943. return uploadMode&4 == 0
  944. }
  945. if strings.HasPrefix(fs.Name(), gcsfsName) {
  946. return uploadMode&8 == 0
  947. }
  948. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  949. return uploadMode&16 == 0
  950. }
  951. return false
  952. }
  953. // HasOpenRWSupport returns true if the fs can open a file
  954. // for reading and writing at the same time
  955. func HasOpenRWSupport(fs Fs) bool {
  956. if IsLocalOsFs(fs) {
  957. return true
  958. }
  959. if IsSFTPFs(fs) && fs.IsUploadResumeSupported() {
  960. return true
  961. }
  962. return false
  963. }
  964. // IsLocalOrCryptoFs returns true if fs is local or local encrypted
  965. func IsLocalOrCryptoFs(fs Fs) bool {
  966. return IsLocalOsFs(fs) || IsCryptOsFs(fs)
  967. }
  968. // SetPathPermissions calls fs.Chown.
  969. // It does nothing for local filesystem on windows
  970. func SetPathPermissions(fs Fs, path string, uid int, gid int) {
  971. if uid == -1 && gid == -1 {
  972. return
  973. }
  974. if IsLocalOsFs(fs) {
  975. if runtime.GOOS == "windows" {
  976. return
  977. }
  978. }
  979. if err := fs.Chown(path, uid, gid); err != nil {
  980. fsLog(fs, logger.LevelWarn, "error chowning path %v: %v", path, err)
  981. }
  982. }
  983. // IsUploadResumeSupported returns true if resuming uploads is supported
  984. func IsUploadResumeSupported(fs Fs, size int64) bool {
  985. if fs.IsUploadResumeSupported() {
  986. return true
  987. }
  988. return fs.IsConditionalUploadResumeSupported(size)
  989. }
  990. func getLastModified(metadata map[string]string) int64 {
  991. if val, ok := metadata[lastModifiedField]; ok {
  992. lastModified, err := strconv.ParseInt(val, 10, 64)
  993. if err == nil {
  994. return lastModified
  995. }
  996. }
  997. return 0
  998. }
  999. func getAzureLastModified(metadata map[string]*string) int64 {
  1000. for k, v := range metadata {
  1001. if strings.ToLower(k) == lastModifiedField {
  1002. if val := util.GetStringFromPointer(v); val != "" {
  1003. lastModified, err := strconv.ParseInt(val, 10, 64)
  1004. if err == nil {
  1005. return lastModified
  1006. }
  1007. }
  1008. return 0
  1009. }
  1010. }
  1011. return 0
  1012. }
  1013. func validateOSFsConfig(config *sdk.OSFsConfig) error {
  1014. if config.ReadBufferSize < 0 || config.ReadBufferSize > 10 {
  1015. return fmt.Errorf("invalid read buffer size must be between 0 and 10 MB")
  1016. }
  1017. if config.WriteBufferSize < 0 || config.WriteBufferSize > 10 {
  1018. return fmt.Errorf("invalid write buffer size must be between 0 and 10 MB")
  1019. }
  1020. return nil
  1021. }
  1022. func doCopy(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
  1023. if buf == nil {
  1024. buf = make([]byte, 32768)
  1025. }
  1026. for {
  1027. nr, er := src.Read(buf)
  1028. if nr > 0 {
  1029. nw, ew := dst.Write(buf[0:nr])
  1030. if nw < 0 || nr < nw {
  1031. nw = 0
  1032. if ew == nil {
  1033. ew = errors.New("invalid write")
  1034. }
  1035. }
  1036. written += int64(nw)
  1037. if ew != nil {
  1038. err = ew
  1039. break
  1040. }
  1041. if nr != nw {
  1042. err = io.ErrShortWrite
  1043. break
  1044. }
  1045. }
  1046. if er != nil {
  1047. if er != io.EOF {
  1048. err = er
  1049. }
  1050. break
  1051. }
  1052. }
  1053. return written, err
  1054. }
  1055. func getMountPath(mountPath string) string {
  1056. if mountPath == "/" {
  1057. return ""
  1058. }
  1059. return mountPath
  1060. }
  1061. func getLocalTempDir() string {
  1062. if tempPath != "" {
  1063. return tempPath
  1064. }
  1065. return filepath.Clean(os.TempDir())
  1066. }
  1067. func doRecursiveRename(fs Fs, source, target string,
  1068. renameFn func(string, string, os.FileInfo, int) (int, int64, error),
  1069. recursion int,
  1070. ) (int, int64, error) {
  1071. var numFiles int
  1072. var filesSize int64
  1073. if recursion > util.MaxRecursion {
  1074. return numFiles, filesSize, util.ErrRecursionTooDeep
  1075. }
  1076. recursion++
  1077. lister, err := fs.ReadDir(source)
  1078. if err != nil {
  1079. return numFiles, filesSize, err
  1080. }
  1081. defer lister.Close()
  1082. for {
  1083. entries, err := lister.Next(ListerBatchSize)
  1084. finished := errors.Is(err, io.EOF)
  1085. if err != nil && !finished {
  1086. return numFiles, filesSize, err
  1087. }
  1088. for _, info := range entries {
  1089. sourceEntry := fs.Join(source, info.Name())
  1090. targetEntry := fs.Join(target, info.Name())
  1091. files, size, err := renameFn(sourceEntry, targetEntry, info, recursion)
  1092. if err != nil {
  1093. if fs.IsNotExist(err) {
  1094. fsLog(fs, logger.LevelInfo, "skipping rename for %q: %v", sourceEntry, err)
  1095. continue
  1096. }
  1097. return numFiles, filesSize, err
  1098. }
  1099. numFiles += files
  1100. filesSize += size
  1101. }
  1102. if finished {
  1103. return numFiles, filesSize, nil
  1104. }
  1105. }
  1106. }
  1107. func fsLog(fs Fs, level logger.LogLevel, format string, v ...any) {
  1108. logger.Log(level, fs.Name(), fs.ConnectionID(), format, v...)
  1109. }