vfs.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package vfs provides local and remote filesystems support
  15. package vfs
  16. import (
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/url"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "runtime"
  25. "slices"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "github.com/eikenb/pipeat"
  31. "github.com/pkg/sftp"
  32. "github.com/sftpgo/sdk"
  33. "github.com/drakkan/sftpgo/v2/internal/kms"
  34. "github.com/drakkan/sftpgo/v2/internal/logger"
  35. "github.com/drakkan/sftpgo/v2/internal/util"
  36. )
  37. const (
  38. dirMimeType = "inode/directory"
  39. s3fsName = "S3Fs"
  40. gcsfsName = "GCSFs"
  41. azBlobFsName = "AzureBlobFs"
  42. lastModifiedField = "sftpgo_last_modified"
  43. preResumeTimeout = 90 * time.Second
  44. // ListerBatchSize defines the default limit for DirLister implementations
  45. ListerBatchSize = 1000
  46. )
  47. // Additional checks for files
  48. const (
  49. CheckParentDir = 1
  50. CheckResume = 2
  51. CheckUpdateModTime = 4
  52. )
  53. var (
  54. validAzAccessTier = []string{"", "Archive", "Hot", "Cool"}
  55. // ErrStorageSizeUnavailable is returned if the storage backend does not support getting the size
  56. ErrStorageSizeUnavailable = errors.New("unable to get available size for this storage backend")
  57. // ErrVfsUnsupported defines the error for an unsupported VFS operation
  58. ErrVfsUnsupported = errors.New("not supported")
  59. errInvalidDirListerLimit = errors.New("dir lister: invalid limit, must be > 0")
  60. tempPath string
  61. sftpFingerprints []string
  62. allowSelfConnections int
  63. renameMode int
  64. readMetadata int
  65. resumeMaxSize int64
  66. uploadMode int
  67. )
  68. // SetAllowSelfConnections sets the desired behaviour for self connections
  69. func SetAllowSelfConnections(value int) {
  70. allowSelfConnections = value
  71. }
  72. // SetTempPath sets the path for temporary files
  73. func SetTempPath(fsPath string) {
  74. tempPath = fsPath
  75. }
  76. // GetTempPath returns the path for temporary files
  77. func GetTempPath() string {
  78. return tempPath
  79. }
  80. // SetSFTPFingerprints sets the SFTP host key fingerprints
  81. func SetSFTPFingerprints(fp []string) {
  82. sftpFingerprints = fp
  83. }
  84. // SetRenameMode sets the rename mode
  85. func SetRenameMode(val int) {
  86. renameMode = val
  87. }
  88. // SetReadMetadataMode sets the read metadata mode
  89. func SetReadMetadataMode(val int) {
  90. readMetadata = val
  91. }
  92. // SetResumeMaxSize sets the max size allowed for resuming uploads for backends
  93. // with immutable objects
  94. func SetResumeMaxSize(val int64) {
  95. resumeMaxSize = val
  96. }
  97. // SetUploadMode sets the upload mode
  98. func SetUploadMode(val int) {
  99. uploadMode = val
  100. }
  101. // Fs defines the interface for filesystem backends
  102. type Fs interface {
  103. Name() string
  104. ConnectionID() string
  105. Stat(name string) (os.FileInfo, error)
  106. Lstat(name string) (os.FileInfo, error)
  107. Open(name string, offset int64) (File, PipeReader, func(), error)
  108. Create(name string, flag, checks int) (File, PipeWriter, func(), error)
  109. Rename(source, target string, checks int) (int, int64, error)
  110. Remove(name string, isDir bool) error
  111. Mkdir(name string) error
  112. Symlink(source, target string) error
  113. Chown(name string, uid int, gid int) error
  114. Chmod(name string, mode os.FileMode) error
  115. Chtimes(name string, atime, mtime time.Time, isUploading bool) error
  116. Truncate(name string, size int64) error
  117. ReadDir(dirname string) (DirLister, error)
  118. Readlink(name string) (string, error)
  119. IsUploadResumeSupported() bool
  120. IsConditionalUploadResumeSupported(size int64) bool
  121. IsAtomicUploadSupported() bool
  122. CheckRootPath(username string, uid int, gid int) bool
  123. ResolvePath(virtualPath string) (string, error)
  124. IsNotExist(err error) bool
  125. IsPermission(err error) bool
  126. IsNotSupported(err error) bool
  127. ScanRootDirContents() (int, int64, error)
  128. GetDirSize(dirname string) (int, int64, error)
  129. GetAtomicUploadPath(name string) string
  130. GetRelativePath(name string) string
  131. Walk(root string, walkFn filepath.WalkFunc) error
  132. Join(elem ...string) string
  133. HasVirtualFolders() bool
  134. GetMimeType(name string) (string, error)
  135. GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error)
  136. Close() error
  137. }
  138. // FsRealPather is a Fs that implements the RealPath method.
  139. type FsRealPather interface {
  140. Fs
  141. RealPath(p string) (string, error)
  142. }
  143. // FsFileCopier is a Fs that implements the CopyFile method.
  144. type FsFileCopier interface {
  145. Fs
  146. CopyFile(source, target string, srcInfo os.FileInfo) (int, int64, error)
  147. }
  148. // File defines an interface representing a SFTPGo file
  149. type File interface {
  150. io.Reader
  151. io.Writer
  152. io.Closer
  153. io.ReaderAt
  154. io.WriterAt
  155. io.Seeker
  156. Stat() (os.FileInfo, error)
  157. Name() string
  158. Truncate(size int64) error
  159. }
  160. // PipeWriter defines an interface representing a SFTPGo pipe writer
  161. type PipeWriter interface {
  162. io.Writer
  163. io.WriterAt
  164. io.Closer
  165. Done(err error)
  166. GetWrittenBytes() int64
  167. }
  168. // PipeReader defines an interface representing a SFTPGo pipe reader
  169. type PipeReader interface {
  170. io.Reader
  171. io.ReaderAt
  172. io.Closer
  173. setMetadata(value map[string]string)
  174. setMetadataFromPointerVal(value map[string]*string)
  175. Metadata() map[string]string
  176. }
  177. // DirLister defines an interface for a directory lister
  178. type DirLister interface {
  179. Next(limit int) ([]os.FileInfo, error)
  180. Close() error
  181. }
  182. // Metadater defines an interface to implement to return metadata for a file
  183. type Metadater interface {
  184. Metadata() map[string]string
  185. }
  186. type baseDirLister struct {
  187. cache []os.FileInfo
  188. }
  189. func (l *baseDirLister) Next(limit int) ([]os.FileInfo, error) {
  190. if limit <= 0 {
  191. return nil, errInvalidDirListerLimit
  192. }
  193. if len(l.cache) >= limit {
  194. return l.returnFromCache(limit), nil
  195. }
  196. return l.returnFromCache(limit), io.EOF
  197. }
  198. func (l *baseDirLister) returnFromCache(limit int) []os.FileInfo {
  199. if len(l.cache) >= limit {
  200. result := l.cache[:limit]
  201. l.cache = l.cache[limit:]
  202. return result
  203. }
  204. result := l.cache
  205. l.cache = nil
  206. return result
  207. }
  208. func (l *baseDirLister) Close() error {
  209. l.cache = nil
  210. return nil
  211. }
  212. // QuotaCheckResult defines the result for a quota check
  213. type QuotaCheckResult struct {
  214. HasSpace bool
  215. AllowedSize int64
  216. AllowedFiles int
  217. UsedSize int64
  218. UsedFiles int
  219. QuotaSize int64
  220. QuotaFiles int
  221. }
  222. // GetRemainingSize returns the remaining allowed size
  223. func (q *QuotaCheckResult) GetRemainingSize() int64 {
  224. if q.QuotaSize > 0 {
  225. return q.QuotaSize - q.UsedSize
  226. }
  227. return 0
  228. }
  229. // GetRemainingFiles returns the remaining allowed files
  230. func (q *QuotaCheckResult) GetRemainingFiles() int {
  231. if q.QuotaFiles > 0 {
  232. return q.QuotaFiles - q.UsedFiles
  233. }
  234. return 0
  235. }
  236. // S3FsConfig defines the configuration for S3 based filesystem
  237. type S3FsConfig struct {
  238. sdk.BaseS3FsConfig
  239. AccessSecret *kms.Secret `json:"access_secret,omitempty"`
  240. }
  241. // HideConfidentialData hides confidential data
  242. func (c *S3FsConfig) HideConfidentialData() {
  243. if c.AccessSecret != nil {
  244. c.AccessSecret.Hide()
  245. }
  246. }
  247. func (c *S3FsConfig) isEqual(other S3FsConfig) bool {
  248. if c.Bucket != other.Bucket {
  249. return false
  250. }
  251. if c.KeyPrefix != other.KeyPrefix {
  252. return false
  253. }
  254. if c.Region != other.Region {
  255. return false
  256. }
  257. if c.AccessKey != other.AccessKey {
  258. return false
  259. }
  260. if c.RoleARN != other.RoleARN {
  261. return false
  262. }
  263. if c.Endpoint != other.Endpoint {
  264. return false
  265. }
  266. if c.StorageClass != other.StorageClass {
  267. return false
  268. }
  269. if c.ACL != other.ACL {
  270. return false
  271. }
  272. if !c.areMultipartFieldsEqual(other) {
  273. return false
  274. }
  275. if c.ForcePathStyle != other.ForcePathStyle {
  276. return false
  277. }
  278. if c.SkipTLSVerify != other.SkipTLSVerify {
  279. return false
  280. }
  281. return c.isSecretEqual(other)
  282. }
  283. func (c *S3FsConfig) areMultipartFieldsEqual(other S3FsConfig) bool {
  284. if c.UploadPartSize != other.UploadPartSize {
  285. return false
  286. }
  287. if c.UploadConcurrency != other.UploadConcurrency {
  288. return false
  289. }
  290. if c.DownloadConcurrency != other.DownloadConcurrency {
  291. return false
  292. }
  293. if c.DownloadPartSize != other.DownloadPartSize {
  294. return false
  295. }
  296. if c.DownloadPartMaxTime != other.DownloadPartMaxTime {
  297. return false
  298. }
  299. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  300. return false
  301. }
  302. return true
  303. }
  304. func (c *S3FsConfig) isSecretEqual(other S3FsConfig) bool {
  305. if c.AccessSecret == nil {
  306. c.AccessSecret = kms.NewEmptySecret()
  307. }
  308. if other.AccessSecret == nil {
  309. other.AccessSecret = kms.NewEmptySecret()
  310. }
  311. return c.AccessSecret.IsEqual(other.AccessSecret)
  312. }
  313. func (c *S3FsConfig) checkCredentials() error {
  314. if c.AccessKey == "" && !c.AccessSecret.IsEmpty() {
  315. return util.NewI18nError(
  316. errors.New("access_key cannot be empty with access_secret not empty"),
  317. util.I18nErrorAccessKeyRequired,
  318. )
  319. }
  320. if c.AccessSecret.IsEmpty() && c.AccessKey != "" {
  321. return util.NewI18nError(
  322. errors.New("access_secret cannot be empty with access_key not empty"),
  323. util.I18nErrorAccessSecretRequired,
  324. )
  325. }
  326. if c.AccessSecret.IsEncrypted() && !c.AccessSecret.IsValid() {
  327. return errors.New("invalid encrypted access_secret")
  328. }
  329. if !c.AccessSecret.IsEmpty() && !c.AccessSecret.IsValidInput() {
  330. return errors.New("invalid access_secret")
  331. }
  332. return nil
  333. }
  334. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  335. func (c *S3FsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  336. if err := c.validate(); err != nil {
  337. var errI18n *util.I18nError
  338. errValidation := util.NewValidationError(fmt.Sprintf("could not validate s3config: %v", err))
  339. if errors.As(err, &errI18n) {
  340. return util.NewI18nError(errValidation, errI18n.Message)
  341. }
  342. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  343. }
  344. if c.AccessSecret.IsPlain() {
  345. c.AccessSecret.SetAdditionalData(additionalData)
  346. err := c.AccessSecret.Encrypt()
  347. if err != nil {
  348. return util.NewI18nError(
  349. util.NewValidationError(fmt.Sprintf("could not encrypt s3 access secret: %v", err)),
  350. util.I18nErrorFsValidation,
  351. )
  352. }
  353. }
  354. return nil
  355. }
  356. func (c *S3FsConfig) checkPartSizeAndConcurrency() error {
  357. if c.UploadPartSize != 0 && (c.UploadPartSize < 5 || c.UploadPartSize > 5000) {
  358. return util.NewI18nError(
  359. errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)"),
  360. util.I18nErrorULPartSizeInvalid,
  361. )
  362. }
  363. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  364. return util.NewI18nError(
  365. fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency),
  366. util.I18nErrorULConcurrencyInvalid,
  367. )
  368. }
  369. if c.DownloadPartSize != 0 && (c.DownloadPartSize < 5 || c.DownloadPartSize > 5000) {
  370. return util.NewI18nError(
  371. errors.New("download_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)"),
  372. util.I18nErrorDLPartSizeInvalid,
  373. )
  374. }
  375. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  376. return util.NewI18nError(
  377. fmt.Errorf("invalid download concurrency: %v", c.DownloadConcurrency),
  378. util.I18nErrorDLConcurrencyInvalid,
  379. )
  380. }
  381. return nil
  382. }
  383. func (c *S3FsConfig) isSameResource(other S3FsConfig) bool {
  384. if c.Bucket != other.Bucket {
  385. return false
  386. }
  387. if c.Endpoint != other.Endpoint {
  388. return false
  389. }
  390. return c.Region == other.Region
  391. }
  392. // validate returns an error if the configuration is not valid
  393. func (c *S3FsConfig) validate() error {
  394. if c.AccessSecret == nil {
  395. c.AccessSecret = kms.NewEmptySecret()
  396. }
  397. if c.Bucket == "" {
  398. return util.NewI18nError(errors.New("bucket cannot be empty"), util.I18nErrorBucketRequired)
  399. }
  400. // the region may be embedded within the endpoint for some S3 compatible
  401. // object storage, for example B2
  402. if c.Endpoint == "" && c.Region == "" {
  403. return util.NewI18nError(errors.New("region cannot be empty"), util.I18nErrorRegionRequired)
  404. }
  405. if err := c.checkCredentials(); err != nil {
  406. return err
  407. }
  408. if c.KeyPrefix != "" {
  409. if strings.HasPrefix(c.KeyPrefix, "/") {
  410. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  411. }
  412. c.KeyPrefix = path.Clean(c.KeyPrefix)
  413. if !strings.HasSuffix(c.KeyPrefix, "/") {
  414. c.KeyPrefix += "/"
  415. }
  416. }
  417. c.StorageClass = strings.TrimSpace(c.StorageClass)
  418. c.ACL = strings.TrimSpace(c.ACL)
  419. return c.checkPartSizeAndConcurrency()
  420. }
  421. // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
  422. type GCSFsConfig struct {
  423. sdk.BaseGCSFsConfig
  424. Credentials *kms.Secret `json:"credentials,omitempty"`
  425. }
  426. // HideConfidentialData hides confidential data
  427. func (c *GCSFsConfig) HideConfidentialData() {
  428. if c.Credentials != nil {
  429. c.Credentials.Hide()
  430. }
  431. }
  432. // ValidateAndEncryptCredentials validates the configuration and encrypts credentials if they are in plain text
  433. func (c *GCSFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  434. if err := c.validate(); err != nil {
  435. var errI18n *util.I18nError
  436. errValidation := util.NewValidationError(fmt.Sprintf("could not validate GCS config: %v", err))
  437. if errors.As(err, &errI18n) {
  438. return util.NewI18nError(errValidation, errI18n.Message)
  439. }
  440. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  441. }
  442. if c.Credentials.IsPlain() {
  443. c.Credentials.SetAdditionalData(additionalData)
  444. err := c.Credentials.Encrypt()
  445. if err != nil {
  446. return util.NewI18nError(
  447. util.NewValidationError(fmt.Sprintf("could not encrypt GCS credentials: %v", err)),
  448. util.I18nErrorFsValidation,
  449. )
  450. }
  451. }
  452. return nil
  453. }
  454. func (c *GCSFsConfig) isEqual(other GCSFsConfig) bool {
  455. if c.Bucket != other.Bucket {
  456. return false
  457. }
  458. if c.KeyPrefix != other.KeyPrefix {
  459. return false
  460. }
  461. if c.AutomaticCredentials != other.AutomaticCredentials {
  462. return false
  463. }
  464. if c.StorageClass != other.StorageClass {
  465. return false
  466. }
  467. if c.ACL != other.ACL {
  468. return false
  469. }
  470. if c.UploadPartSize != other.UploadPartSize {
  471. return false
  472. }
  473. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  474. return false
  475. }
  476. if c.Credentials == nil {
  477. c.Credentials = kms.NewEmptySecret()
  478. }
  479. if other.Credentials == nil {
  480. other.Credentials = kms.NewEmptySecret()
  481. }
  482. return c.Credentials.IsEqual(other.Credentials)
  483. }
  484. func (c *GCSFsConfig) isSameResource(other GCSFsConfig) bool {
  485. return c.Bucket == other.Bucket
  486. }
  487. // validate returns an error if the configuration is not valid
  488. func (c *GCSFsConfig) validate() error {
  489. if c.Credentials == nil || c.AutomaticCredentials == 1 {
  490. c.Credentials = kms.NewEmptySecret()
  491. }
  492. if c.Bucket == "" {
  493. return util.NewI18nError(errors.New("bucket cannot be empty"), util.I18nErrorBucketRequired)
  494. }
  495. if c.KeyPrefix != "" {
  496. if strings.HasPrefix(c.KeyPrefix, "/") {
  497. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  498. }
  499. c.KeyPrefix = path.Clean(c.KeyPrefix)
  500. if !strings.HasSuffix(c.KeyPrefix, "/") {
  501. c.KeyPrefix += "/"
  502. }
  503. }
  504. if c.Credentials.IsEncrypted() && !c.Credentials.IsValid() {
  505. return errors.New("invalid encrypted credentials")
  506. }
  507. if c.AutomaticCredentials == 0 && !c.Credentials.IsValidInput() {
  508. return util.NewI18nError(errors.New("invalid credentials"), util.I18nErrorFsCredentialsRequired)
  509. }
  510. c.StorageClass = strings.TrimSpace(c.StorageClass)
  511. c.ACL = strings.TrimSpace(c.ACL)
  512. if c.UploadPartSize < 0 {
  513. c.UploadPartSize = 0
  514. }
  515. if c.UploadPartMaxTime < 0 {
  516. c.UploadPartMaxTime = 0
  517. }
  518. return nil
  519. }
  520. // AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
  521. type AzBlobFsConfig struct {
  522. sdk.BaseAzBlobFsConfig
  523. // Storage Account Key leave blank to use SAS URL.
  524. // The access key is stored encrypted based on the kms configuration
  525. AccountKey *kms.Secret `json:"account_key,omitempty"`
  526. // Shared access signature URL, leave blank if using account/key
  527. SASURL *kms.Secret `json:"sas_url,omitempty"`
  528. }
  529. // HideConfidentialData hides confidential data
  530. func (c *AzBlobFsConfig) HideConfidentialData() {
  531. if c.AccountKey != nil {
  532. c.AccountKey.Hide()
  533. }
  534. if c.SASURL != nil {
  535. c.SASURL.Hide()
  536. }
  537. }
  538. func (c *AzBlobFsConfig) isEqual(other AzBlobFsConfig) bool {
  539. if c.Container != other.Container {
  540. return false
  541. }
  542. if c.AccountName != other.AccountName {
  543. return false
  544. }
  545. if c.Endpoint != other.Endpoint {
  546. return false
  547. }
  548. if c.SASURL.IsEmpty() {
  549. c.SASURL = kms.NewEmptySecret()
  550. }
  551. if other.SASURL.IsEmpty() {
  552. other.SASURL = kms.NewEmptySecret()
  553. }
  554. if !c.SASURL.IsEqual(other.SASURL) {
  555. return false
  556. }
  557. if c.KeyPrefix != other.KeyPrefix {
  558. return false
  559. }
  560. if c.UploadPartSize != other.UploadPartSize {
  561. return false
  562. }
  563. if c.UploadConcurrency != other.UploadConcurrency {
  564. return false
  565. }
  566. if c.DownloadPartSize != other.DownloadPartSize {
  567. return false
  568. }
  569. if c.DownloadConcurrency != other.DownloadConcurrency {
  570. return false
  571. }
  572. if c.UseEmulator != other.UseEmulator {
  573. return false
  574. }
  575. if c.AccessTier != other.AccessTier {
  576. return false
  577. }
  578. return c.isSecretEqual(other)
  579. }
  580. func (c *AzBlobFsConfig) isSecretEqual(other AzBlobFsConfig) bool {
  581. if c.AccountKey == nil {
  582. c.AccountKey = kms.NewEmptySecret()
  583. }
  584. if other.AccountKey == nil {
  585. other.AccountKey = kms.NewEmptySecret()
  586. }
  587. return c.AccountKey.IsEqual(other.AccountKey)
  588. }
  589. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  590. func (c *AzBlobFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  591. if err := c.validate(); err != nil {
  592. var errI18n *util.I18nError
  593. errValidation := util.NewValidationError(fmt.Sprintf("could not validate Azure Blob config: %v", err))
  594. if errors.As(err, &errI18n) {
  595. return util.NewI18nError(errValidation, errI18n.Message)
  596. }
  597. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  598. }
  599. if c.AccountKey.IsPlain() {
  600. c.AccountKey.SetAdditionalData(additionalData)
  601. if err := c.AccountKey.Encrypt(); err != nil {
  602. return util.NewI18nError(
  603. util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob account key: %v", err)),
  604. util.I18nErrorFsValidation,
  605. )
  606. }
  607. }
  608. if c.SASURL.IsPlain() {
  609. c.SASURL.SetAdditionalData(additionalData)
  610. if err := c.SASURL.Encrypt(); err != nil {
  611. return util.NewI18nError(
  612. util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob SAS URL: %v", err)),
  613. util.I18nErrorFsValidation,
  614. )
  615. }
  616. }
  617. return nil
  618. }
  619. func (c *AzBlobFsConfig) checkCredentials() error {
  620. if c.SASURL.IsPlain() {
  621. _, err := url.Parse(c.SASURL.GetPayload())
  622. if err != nil {
  623. return util.NewI18nError(err, util.I18nErrorSASURLInvalid)
  624. }
  625. return nil
  626. }
  627. if c.SASURL.IsEncrypted() && !c.SASURL.IsValid() {
  628. return errors.New("invalid encrypted sas_url")
  629. }
  630. if !c.SASURL.IsEmpty() {
  631. return nil
  632. }
  633. if c.AccountName == "" || !c.AccountKey.IsValidInput() {
  634. return util.NewI18nError(errors.New("credentials cannot be empty or invalid"), util.I18nErrorAccountNameRequired)
  635. }
  636. if c.AccountKey.IsEncrypted() && !c.AccountKey.IsValid() {
  637. return errors.New("invalid encrypted account_key")
  638. }
  639. return nil
  640. }
  641. func (c *AzBlobFsConfig) checkPartSizeAndConcurrency() error {
  642. if c.UploadPartSize < 0 || c.UploadPartSize > 100 {
  643. return util.NewI18nError(
  644. fmt.Errorf("invalid upload part size: %v", c.UploadPartSize),
  645. util.I18nErrorULPartSizeInvalid,
  646. )
  647. }
  648. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  649. return util.NewI18nError(
  650. fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency),
  651. util.I18nErrorULConcurrencyInvalid,
  652. )
  653. }
  654. if c.DownloadPartSize < 0 || c.DownloadPartSize > 100 {
  655. return util.NewI18nError(
  656. fmt.Errorf("invalid download part size: %v", c.DownloadPartSize),
  657. util.I18nErrorDLPartSizeInvalid,
  658. )
  659. }
  660. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  661. return util.NewI18nError(
  662. fmt.Errorf("invalid upload concurrency: %v", c.DownloadConcurrency),
  663. util.I18nErrorDLConcurrencyInvalid,
  664. )
  665. }
  666. return nil
  667. }
  668. func (c *AzBlobFsConfig) tryDecrypt() error {
  669. if err := c.AccountKey.TryDecrypt(); err != nil {
  670. return fmt.Errorf("unable to decrypt account key: %w", err)
  671. }
  672. if err := c.SASURL.TryDecrypt(); err != nil {
  673. return fmt.Errorf("unable to decrypt SAS URL: %w", err)
  674. }
  675. return nil
  676. }
  677. func (c *AzBlobFsConfig) isSameResource(other AzBlobFsConfig) bool {
  678. if c.AccountName != other.AccountName {
  679. return false
  680. }
  681. if c.Endpoint != other.Endpoint {
  682. return false
  683. }
  684. return c.SASURL.GetPayload() == other.SASURL.GetPayload()
  685. }
  686. // validate returns an error if the configuration is not valid
  687. func (c *AzBlobFsConfig) validate() error {
  688. if c.AccountKey == nil {
  689. c.AccountKey = kms.NewEmptySecret()
  690. }
  691. if c.SASURL == nil {
  692. c.SASURL = kms.NewEmptySecret()
  693. }
  694. // container could be embedded within SAS URL we check this at runtime
  695. if c.SASURL.IsEmpty() && c.Container == "" {
  696. return util.NewI18nError(errors.New("container cannot be empty"), util.I18nErrorContainerRequired)
  697. }
  698. if err := c.checkCredentials(); err != nil {
  699. return err
  700. }
  701. if c.KeyPrefix != "" {
  702. if strings.HasPrefix(c.KeyPrefix, "/") {
  703. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  704. }
  705. c.KeyPrefix = path.Clean(c.KeyPrefix)
  706. if !strings.HasSuffix(c.KeyPrefix, "/") {
  707. c.KeyPrefix += "/"
  708. }
  709. }
  710. if err := c.checkPartSizeAndConcurrency(); err != nil {
  711. return err
  712. }
  713. if !slices.Contains(validAzAccessTier, c.AccessTier) {
  714. return fmt.Errorf("invalid access tier %q, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))
  715. }
  716. return nil
  717. }
  718. // CryptFsConfig defines the configuration to store local files as encrypted
  719. type CryptFsConfig struct {
  720. sdk.OSFsConfig
  721. Passphrase *kms.Secret `json:"passphrase,omitempty"`
  722. }
  723. // HideConfidentialData hides confidential data
  724. func (c *CryptFsConfig) HideConfidentialData() {
  725. if c.Passphrase != nil {
  726. c.Passphrase.Hide()
  727. }
  728. }
  729. func (c *CryptFsConfig) isEqual(other CryptFsConfig) bool {
  730. if c.Passphrase == nil {
  731. c.Passphrase = kms.NewEmptySecret()
  732. }
  733. if other.Passphrase == nil {
  734. other.Passphrase = kms.NewEmptySecret()
  735. }
  736. return c.Passphrase.IsEqual(other.Passphrase)
  737. }
  738. // ValidateAndEncryptCredentials validates the configuration and encrypts the passphrase if it is in plain text
  739. func (c *CryptFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  740. if err := c.validate(); err != nil {
  741. var errI18n *util.I18nError
  742. errValidation := util.NewValidationError(fmt.Sprintf("could not validate crypt fs config: %v", err))
  743. if errors.As(err, &errI18n) {
  744. return util.NewI18nError(errValidation, errI18n.Message)
  745. }
  746. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  747. }
  748. if c.Passphrase.IsPlain() {
  749. c.Passphrase.SetAdditionalData(additionalData)
  750. if err := c.Passphrase.Encrypt(); err != nil {
  751. return util.NewI18nError(
  752. util.NewValidationError(fmt.Sprintf("could not encrypt Crypt fs passphrase: %v", err)),
  753. util.I18nErrorFsValidation,
  754. )
  755. }
  756. }
  757. return nil
  758. }
  759. func (c *CryptFsConfig) isSameResource(other CryptFsConfig) bool {
  760. return c.Passphrase.GetPayload() == other.Passphrase.GetPayload()
  761. }
  762. // validate returns an error if the configuration is not valid
  763. func (c *CryptFsConfig) validate() error {
  764. if c.Passphrase == nil || c.Passphrase.IsEmpty() {
  765. return util.NewI18nError(errors.New("invalid passphrase"), util.I18nErrorPassphraseRequired)
  766. }
  767. if !c.Passphrase.IsValidInput() {
  768. return util.NewI18nError(errors.New("passphrase cannot be empty or invalid"), util.I18nErrorPassphraseRequired)
  769. }
  770. if c.Passphrase.IsEncrypted() && !c.Passphrase.IsValid() {
  771. return errors.New("invalid encrypted passphrase")
  772. }
  773. return nil
  774. }
  775. // pipeWriter defines a wrapper for pipeat.PipeWriterAt.
  776. type pipeWriter struct {
  777. *pipeat.PipeWriterAt
  778. err error
  779. done chan bool
  780. }
  781. // NewPipeWriter initializes a new PipeWriter
  782. func NewPipeWriter(w *pipeat.PipeWriterAt) PipeWriter {
  783. return &pipeWriter{
  784. PipeWriterAt: w,
  785. err: nil,
  786. done: make(chan bool),
  787. }
  788. }
  789. // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
  790. func (p *pipeWriter) Close() error {
  791. p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null
  792. <-p.done
  793. return p.err
  794. }
  795. // Done unlocks other goroutines waiting on Close().
  796. // It must be called when the upload ends
  797. func (p *pipeWriter) Done(err error) {
  798. p.err = err
  799. p.done <- true
  800. }
  801. func newPipeWriterAtOffset(w *pipeat.PipeWriterAt, offset int64) PipeWriter {
  802. return &pipeWriterAtOffset{
  803. pipeWriter: &pipeWriter{
  804. PipeWriterAt: w,
  805. err: nil,
  806. done: make(chan bool),
  807. },
  808. offset: offset,
  809. writeOffset: offset,
  810. }
  811. }
  812. type pipeWriterAtOffset struct {
  813. *pipeWriter
  814. offset int64
  815. writeOffset int64
  816. }
  817. func (p *pipeWriterAtOffset) WriteAt(buf []byte, off int64) (int, error) {
  818. if off < p.offset {
  819. return 0, fmt.Errorf("invalid offset %d, minimum accepted %d", off, p.offset)
  820. }
  821. return p.pipeWriter.WriteAt(buf, off-p.offset)
  822. }
  823. func (p *pipeWriterAtOffset) Write(buf []byte) (int, error) {
  824. n, err := p.WriteAt(buf, p.writeOffset)
  825. p.writeOffset += int64(n)
  826. return n, err
  827. }
  828. // NewPipeReader initializes a new PipeReader
  829. func NewPipeReader(r *pipeat.PipeReaderAt) PipeReader {
  830. return &pipeReader{
  831. PipeReaderAt: r,
  832. }
  833. }
  834. // pipeReader defines a wrapper for pipeat.PipeReaderAt.
  835. type pipeReader struct {
  836. *pipeat.PipeReaderAt
  837. mu sync.RWMutex
  838. metadata map[string]string
  839. }
  840. func (p *pipeReader) setMetadata(value map[string]string) {
  841. p.mu.Lock()
  842. defer p.mu.Unlock()
  843. p.metadata = value
  844. }
  845. func (p *pipeReader) setMetadataFromPointerVal(value map[string]*string) {
  846. p.mu.Lock()
  847. defer p.mu.Unlock()
  848. if len(value) == 0 {
  849. p.metadata = nil
  850. return
  851. }
  852. p.metadata = map[string]string{}
  853. for k, v := range value {
  854. val := util.GetStringFromPointer(v)
  855. if val != "" {
  856. p.metadata[k] = val
  857. }
  858. }
  859. }
  860. // Metadata implements the Metadater interface
  861. func (p *pipeReader) Metadata() map[string]string {
  862. p.mu.RLock()
  863. defer p.mu.RUnlock()
  864. if len(p.metadata) == 0 {
  865. return nil
  866. }
  867. result := make(map[string]string)
  868. for k, v := range p.metadata {
  869. result[k] = v
  870. }
  871. return result
  872. }
  873. func isEqualityCheckModeValid(mode int) bool {
  874. return mode >= 0 || mode <= 1
  875. }
  876. // isDirectory checks if a path exists and is a directory
  877. func isDirectory(fs Fs, path string) (bool, error) {
  878. fileInfo, err := fs.Stat(path)
  879. if err != nil {
  880. return false, err
  881. }
  882. return fileInfo.IsDir(), err
  883. }
  884. // IsLocalOsFs returns true if fs is a local filesystem implementation
  885. func IsLocalOsFs(fs Fs) bool {
  886. return fs.Name() == osFsName
  887. }
  888. // IsCryptOsFs returns true if fs is an encrypted local filesystem implementation
  889. func IsCryptOsFs(fs Fs) bool {
  890. return fs.Name() == cryptFsName
  891. }
  892. // IsSFTPFs returns true if fs is an SFTP filesystem
  893. func IsSFTPFs(fs Fs) bool {
  894. return strings.HasPrefix(fs.Name(), sftpFsName)
  895. }
  896. // IsHTTPFs returns true if fs is an HTTP filesystem
  897. func IsHTTPFs(fs Fs) bool {
  898. return strings.HasPrefix(fs.Name(), httpFsName)
  899. }
  900. // IsBufferedLocalOrSFTPFs returns true if this is a buffered SFTP or local filesystem
  901. func IsBufferedLocalOrSFTPFs(fs Fs) bool {
  902. if osFs, ok := fs.(*OsFs); ok {
  903. return osFs.writeBufferSize > 0
  904. }
  905. if !IsSFTPFs(fs) {
  906. return false
  907. }
  908. return !fs.IsUploadResumeSupported()
  909. }
  910. // FsOpenReturnsFile returns true if fs.Open returns a *os.File handle
  911. func FsOpenReturnsFile(fs Fs) bool {
  912. if osFs, ok := fs.(*OsFs); ok {
  913. return osFs.readBufferSize == 0
  914. }
  915. if sftpFs, ok := fs.(*SFTPFs); ok {
  916. return sftpFs.config.BufferSize == 0
  917. }
  918. return false
  919. }
  920. // IsLocalOrSFTPFs returns true if fs is local or SFTP
  921. func IsLocalOrSFTPFs(fs Fs) bool {
  922. return IsLocalOsFs(fs) || IsSFTPFs(fs)
  923. }
  924. // HasTruncateSupport returns true if the fs supports truncate files
  925. func HasTruncateSupport(fs Fs) bool {
  926. return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
  927. }
  928. // IsRenameAtomic returns true if renaming a directory is supposed to be atomic
  929. func IsRenameAtomic(fs Fs) bool {
  930. if strings.HasPrefix(fs.Name(), s3fsName) {
  931. return false
  932. }
  933. if strings.HasPrefix(fs.Name(), gcsfsName) {
  934. return false
  935. }
  936. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  937. return false
  938. }
  939. return true
  940. }
  941. // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
  942. func HasImplicitAtomicUploads(fs Fs) bool {
  943. if strings.HasPrefix(fs.Name(), s3fsName) {
  944. return uploadMode&4 == 0
  945. }
  946. if strings.HasPrefix(fs.Name(), gcsfsName) {
  947. return uploadMode&8 == 0
  948. }
  949. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  950. return uploadMode&16 == 0
  951. }
  952. return false
  953. }
  954. // HasOpenRWSupport returns true if the fs can open a file
  955. // for reading and writing at the same time
  956. func HasOpenRWSupport(fs Fs) bool {
  957. if IsLocalOsFs(fs) {
  958. return true
  959. }
  960. if IsSFTPFs(fs) && fs.IsUploadResumeSupported() {
  961. return true
  962. }
  963. return false
  964. }
  965. // IsLocalOrCryptoFs returns true if fs is local or local encrypted
  966. func IsLocalOrCryptoFs(fs Fs) bool {
  967. return IsLocalOsFs(fs) || IsCryptOsFs(fs)
  968. }
  969. // SetPathPermissions calls fs.Chown.
  970. // It does nothing for local filesystem on windows
  971. func SetPathPermissions(fs Fs, path string, uid int, gid int) {
  972. if uid == -1 && gid == -1 {
  973. return
  974. }
  975. if IsLocalOsFs(fs) {
  976. if runtime.GOOS == "windows" {
  977. return
  978. }
  979. }
  980. if err := fs.Chown(path, uid, gid); err != nil {
  981. fsLog(fs, logger.LevelWarn, "error chowning path %v: %v", path, err)
  982. }
  983. }
  984. // IsUploadResumeSupported returns true if resuming uploads is supported
  985. func IsUploadResumeSupported(fs Fs, size int64) bool {
  986. if fs.IsUploadResumeSupported() {
  987. return true
  988. }
  989. return fs.IsConditionalUploadResumeSupported(size)
  990. }
  991. func getLastModified(metadata map[string]string) int64 {
  992. if val, ok := metadata[lastModifiedField]; ok && val != "" {
  993. lastModified, err := strconv.ParseInt(val, 10, 64)
  994. if err == nil {
  995. return lastModified
  996. }
  997. }
  998. return 0
  999. }
  1000. func getAzureLastModified(metadata map[string]*string) int64 {
  1001. for k, v := range metadata {
  1002. if strings.ToLower(k) == lastModifiedField {
  1003. if val := util.GetStringFromPointer(v); val != "" {
  1004. lastModified, err := strconv.ParseInt(val, 10, 64)
  1005. if err == nil {
  1006. return lastModified
  1007. }
  1008. }
  1009. return 0
  1010. }
  1011. }
  1012. return 0
  1013. }
  1014. func validateOSFsConfig(config *sdk.OSFsConfig) error {
  1015. if config.ReadBufferSize < 0 || config.ReadBufferSize > 10 {
  1016. return fmt.Errorf("invalid read buffer size must be between 0 and 10 MB")
  1017. }
  1018. if config.WriteBufferSize < 0 || config.WriteBufferSize > 10 {
  1019. return fmt.Errorf("invalid write buffer size must be between 0 and 10 MB")
  1020. }
  1021. return nil
  1022. }
  1023. func doCopy(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
  1024. if buf == nil {
  1025. buf = make([]byte, 32768)
  1026. }
  1027. for {
  1028. nr, er := src.Read(buf)
  1029. if nr > 0 {
  1030. nw, ew := dst.Write(buf[0:nr])
  1031. if nw < 0 || nr < nw {
  1032. nw = 0
  1033. if ew == nil {
  1034. ew = errors.New("invalid write")
  1035. }
  1036. }
  1037. written += int64(nw)
  1038. if ew != nil {
  1039. err = ew
  1040. break
  1041. }
  1042. if nr != nw {
  1043. err = io.ErrShortWrite
  1044. break
  1045. }
  1046. }
  1047. if er != nil {
  1048. if er != io.EOF {
  1049. err = er
  1050. }
  1051. break
  1052. }
  1053. }
  1054. return written, err
  1055. }
  1056. func getMountPath(mountPath string) string {
  1057. if mountPath == "/" {
  1058. return ""
  1059. }
  1060. return mountPath
  1061. }
  1062. func getLocalTempDir() string {
  1063. if tempPath != "" {
  1064. return tempPath
  1065. }
  1066. return filepath.Clean(os.TempDir())
  1067. }
  1068. func doRecursiveRename(fs Fs, source, target string,
  1069. renameFn func(string, string, os.FileInfo, int, bool) (int, int64, error),
  1070. recursion int, updateModTime bool,
  1071. ) (int, int64, error) {
  1072. var numFiles int
  1073. var filesSize int64
  1074. if recursion > util.MaxRecursion {
  1075. return numFiles, filesSize, util.ErrRecursionTooDeep
  1076. }
  1077. recursion++
  1078. lister, err := fs.ReadDir(source)
  1079. if err != nil {
  1080. return numFiles, filesSize, err
  1081. }
  1082. defer lister.Close()
  1083. for {
  1084. entries, err := lister.Next(ListerBatchSize)
  1085. finished := errors.Is(err, io.EOF)
  1086. if err != nil && !finished {
  1087. return numFiles, filesSize, err
  1088. }
  1089. for _, info := range entries {
  1090. sourceEntry := fs.Join(source, info.Name())
  1091. targetEntry := fs.Join(target, info.Name())
  1092. files, size, err := renameFn(sourceEntry, targetEntry, info, recursion, updateModTime)
  1093. if err != nil {
  1094. if fs.IsNotExist(err) {
  1095. fsLog(fs, logger.LevelInfo, "skipping rename for %q: %v", sourceEntry, err)
  1096. continue
  1097. }
  1098. return numFiles, filesSize, err
  1099. }
  1100. numFiles += files
  1101. filesSize += size
  1102. }
  1103. if finished {
  1104. return numFiles, filesSize, nil
  1105. }
  1106. }
  1107. }
  1108. func fsLog(fs Fs, level logger.LogLevel, format string, v ...any) {
  1109. logger.Log(level, fs.Name(), fs.ConnectionID(), format, v...)
  1110. }