vfs.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package vfs provides local and remote filesystems support
  15. package vfs
  16. import (
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/url"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "runtime"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "time"
  29. "github.com/eikenb/pipeat"
  30. "github.com/pkg/sftp"
  31. "github.com/sftpgo/sdk"
  32. "github.com/drakkan/sftpgo/v2/internal/kms"
  33. "github.com/drakkan/sftpgo/v2/internal/logger"
  34. "github.com/drakkan/sftpgo/v2/internal/util"
  35. )
  36. const (
  37. dirMimeType = "inode/directory"
  38. s3fsName = "S3Fs"
  39. gcsfsName = "GCSFs"
  40. azBlobFsName = "AzureBlobFs"
  41. lastModifiedField = "sftpgo_last_modified"
  42. preResumeTimeout = 90 * time.Second
  43. // ListerBatchSize defines the default limit for DirLister implementations
  44. ListerBatchSize = 1000
  45. )
  46. // Additional checks for files
  47. const (
  48. CheckParentDir = 1
  49. CheckResume = 2
  50. )
  51. var (
  52. validAzAccessTier = []string{"", "Archive", "Hot", "Cool"}
  53. // ErrStorageSizeUnavailable is returned if the storage backend does not support getting the size
  54. ErrStorageSizeUnavailable = errors.New("unable to get available size for this storage backend")
  55. // ErrVfsUnsupported defines the error for an unsupported VFS operation
  56. ErrVfsUnsupported = errors.New("not supported")
  57. errInvalidDirListerLimit = errors.New("dir lister: invalid limit, must be > 0")
  58. tempPath string
  59. sftpFingerprints []string
  60. allowSelfConnections int
  61. renameMode int
  62. readMetadata int
  63. resumeMaxSize int64
  64. uploadMode int
  65. )
  66. // SetAllowSelfConnections sets the desired behaviour for self connections
  67. func SetAllowSelfConnections(value int) {
  68. allowSelfConnections = value
  69. }
  70. // SetTempPath sets the path for temporary files
  71. func SetTempPath(fsPath string) {
  72. tempPath = fsPath
  73. }
  74. // GetTempPath returns the path for temporary files
  75. func GetTempPath() string {
  76. return tempPath
  77. }
  78. // SetSFTPFingerprints sets the SFTP host key fingerprints
  79. func SetSFTPFingerprints(fp []string) {
  80. sftpFingerprints = fp
  81. }
  82. // SetRenameMode sets the rename mode
  83. func SetRenameMode(val int) {
  84. renameMode = val
  85. }
  86. // SetReadMetadataMode sets the read metadata mode
  87. func SetReadMetadataMode(val int) {
  88. readMetadata = val
  89. }
  90. // SetResumeMaxSize sets the max size allowed for resuming uploads for backends
  91. // with immutable objects
  92. func SetResumeMaxSize(val int64) {
  93. resumeMaxSize = val
  94. }
  95. // SetUploadMode sets the upload mode
  96. func SetUploadMode(val int) {
  97. uploadMode = val
  98. }
  99. // Fs defines the interface for filesystem backends
  100. type Fs interface {
  101. Name() string
  102. ConnectionID() string
  103. Stat(name string) (os.FileInfo, error)
  104. Lstat(name string) (os.FileInfo, error)
  105. Open(name string, offset int64) (File, PipeReader, func(), error)
  106. Create(name string, flag, checks int) (File, PipeWriter, func(), error)
  107. Rename(source, target string) (int, int64, error)
  108. Remove(name string, isDir bool) error
  109. Mkdir(name string) error
  110. Symlink(source, target string) error
  111. Chown(name string, uid int, gid int) error
  112. Chmod(name string, mode os.FileMode) error
  113. Chtimes(name string, atime, mtime time.Time, isUploading bool) error
  114. Truncate(name string, size int64) error
  115. ReadDir(dirname string) (DirLister, error)
  116. Readlink(name string) (string, error)
  117. IsUploadResumeSupported() bool
  118. IsConditionalUploadResumeSupported(size int64) bool
  119. IsAtomicUploadSupported() bool
  120. CheckRootPath(username string, uid int, gid int) bool
  121. ResolvePath(virtualPath string) (string, error)
  122. IsNotExist(err error) bool
  123. IsPermission(err error) bool
  124. IsNotSupported(err error) bool
  125. ScanRootDirContents() (int, int64, error)
  126. GetDirSize(dirname string) (int, int64, error)
  127. GetAtomicUploadPath(name string) string
  128. GetRelativePath(name string) string
  129. Walk(root string, walkFn filepath.WalkFunc) error
  130. Join(elem ...string) string
  131. HasVirtualFolders() bool
  132. GetMimeType(name string) (string, error)
  133. GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error)
  134. Close() error
  135. }
  136. // FsRealPather is a Fs that implements the RealPath method.
  137. type FsRealPather interface {
  138. Fs
  139. RealPath(p string) (string, error)
  140. }
  141. // FsFileCopier is a Fs that implements the CopyFile method.
  142. type FsFileCopier interface {
  143. Fs
  144. CopyFile(source, target string, srcSize int64) (int, int64, error)
  145. }
  146. // File defines an interface representing a SFTPGo file
  147. type File interface {
  148. io.Reader
  149. io.Writer
  150. io.Closer
  151. io.ReaderAt
  152. io.WriterAt
  153. io.Seeker
  154. Stat() (os.FileInfo, error)
  155. Name() string
  156. Truncate(size int64) error
  157. }
  158. // PipeWriter defines an interface representing a SFTPGo pipe writer
  159. type PipeWriter interface {
  160. io.Writer
  161. io.WriterAt
  162. io.Closer
  163. Done(err error)
  164. GetWrittenBytes() int64
  165. }
  166. // PipeReader defines an interface representing a SFTPGo pipe reader
  167. type PipeReader interface {
  168. io.Reader
  169. io.ReaderAt
  170. io.Closer
  171. setMetadata(value map[string]string)
  172. setMetadataFromPointerVal(value map[string]*string)
  173. Metadata() map[string]string
  174. }
  175. // DirLister defines an interface for a directory lister
  176. type DirLister interface {
  177. Next(limit int) ([]os.FileInfo, error)
  178. Close() error
  179. }
  180. // Metadater defines an interface to implement to return metadata for a file
  181. type Metadater interface {
  182. Metadata() map[string]string
  183. }
  184. type baseDirLister struct {
  185. cache []os.FileInfo
  186. }
  187. func (l *baseDirLister) Next(limit int) ([]os.FileInfo, error) {
  188. if limit <= 0 {
  189. return nil, errInvalidDirListerLimit
  190. }
  191. if len(l.cache) >= limit {
  192. return l.returnFromCache(limit), nil
  193. }
  194. return l.returnFromCache(limit), io.EOF
  195. }
  196. func (l *baseDirLister) returnFromCache(limit int) []os.FileInfo {
  197. if len(l.cache) >= limit {
  198. result := l.cache[:limit]
  199. l.cache = l.cache[limit:]
  200. return result
  201. }
  202. result := l.cache
  203. l.cache = nil
  204. return result
  205. }
  206. func (l *baseDirLister) Close() error {
  207. l.cache = nil
  208. return nil
  209. }
  210. // QuotaCheckResult defines the result for a quota check
  211. type QuotaCheckResult struct {
  212. HasSpace bool
  213. AllowedSize int64
  214. AllowedFiles int
  215. UsedSize int64
  216. UsedFiles int
  217. QuotaSize int64
  218. QuotaFiles int
  219. }
  220. // GetRemainingSize returns the remaining allowed size
  221. func (q *QuotaCheckResult) GetRemainingSize() int64 {
  222. if q.QuotaSize > 0 {
  223. return q.QuotaSize - q.UsedSize
  224. }
  225. return 0
  226. }
  227. // GetRemainingFiles returns the remaining allowed files
  228. func (q *QuotaCheckResult) GetRemainingFiles() int {
  229. if q.QuotaFiles > 0 {
  230. return q.QuotaFiles - q.UsedFiles
  231. }
  232. return 0
  233. }
  234. // S3FsConfig defines the configuration for S3 based filesystem
  235. type S3FsConfig struct {
  236. sdk.BaseS3FsConfig
  237. AccessSecret *kms.Secret `json:"access_secret,omitempty"`
  238. }
  239. // HideConfidentialData hides confidential data
  240. func (c *S3FsConfig) HideConfidentialData() {
  241. if c.AccessSecret != nil {
  242. c.AccessSecret.Hide()
  243. }
  244. }
  245. func (c *S3FsConfig) isEqual(other S3FsConfig) bool {
  246. if c.Bucket != other.Bucket {
  247. return false
  248. }
  249. if c.KeyPrefix != other.KeyPrefix {
  250. return false
  251. }
  252. if c.Region != other.Region {
  253. return false
  254. }
  255. if c.AccessKey != other.AccessKey {
  256. return false
  257. }
  258. if c.RoleARN != other.RoleARN {
  259. return false
  260. }
  261. if c.Endpoint != other.Endpoint {
  262. return false
  263. }
  264. if c.StorageClass != other.StorageClass {
  265. return false
  266. }
  267. if c.ACL != other.ACL {
  268. return false
  269. }
  270. if !c.areMultipartFieldsEqual(other) {
  271. return false
  272. }
  273. if c.ForcePathStyle != other.ForcePathStyle {
  274. return false
  275. }
  276. if c.SkipTLSVerify != other.SkipTLSVerify {
  277. return false
  278. }
  279. return c.isSecretEqual(other)
  280. }
  281. func (c *S3FsConfig) areMultipartFieldsEqual(other S3FsConfig) bool {
  282. if c.UploadPartSize != other.UploadPartSize {
  283. return false
  284. }
  285. if c.UploadConcurrency != other.UploadConcurrency {
  286. return false
  287. }
  288. if c.DownloadConcurrency != other.DownloadConcurrency {
  289. return false
  290. }
  291. if c.DownloadPartSize != other.DownloadPartSize {
  292. return false
  293. }
  294. if c.DownloadPartMaxTime != other.DownloadPartMaxTime {
  295. return false
  296. }
  297. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  298. return false
  299. }
  300. return true
  301. }
  302. func (c *S3FsConfig) isSecretEqual(other S3FsConfig) bool {
  303. if c.AccessSecret == nil {
  304. c.AccessSecret = kms.NewEmptySecret()
  305. }
  306. if other.AccessSecret == nil {
  307. other.AccessSecret = kms.NewEmptySecret()
  308. }
  309. return c.AccessSecret.IsEqual(other.AccessSecret)
  310. }
  311. func (c *S3FsConfig) checkCredentials() error {
  312. if c.AccessKey == "" && !c.AccessSecret.IsEmpty() {
  313. return util.NewI18nError(
  314. errors.New("access_key cannot be empty with access_secret not empty"),
  315. util.I18nErrorAccessKeyRequired,
  316. )
  317. }
  318. if c.AccessSecret.IsEmpty() && c.AccessKey != "" {
  319. return util.NewI18nError(
  320. errors.New("access_secret cannot be empty with access_key not empty"),
  321. util.I18nErrorAccessSecretRequired,
  322. )
  323. }
  324. if c.AccessSecret.IsEncrypted() && !c.AccessSecret.IsValid() {
  325. return errors.New("invalid encrypted access_secret")
  326. }
  327. if !c.AccessSecret.IsEmpty() && !c.AccessSecret.IsValidInput() {
  328. return errors.New("invalid access_secret")
  329. }
  330. return nil
  331. }
  332. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  333. func (c *S3FsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  334. if err := c.validate(); err != nil {
  335. var errI18n *util.I18nError
  336. errValidation := util.NewValidationError(fmt.Sprintf("could not validate s3config: %v", err))
  337. if errors.As(err, &errI18n) {
  338. return util.NewI18nError(errValidation, errI18n.Message)
  339. }
  340. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  341. }
  342. if c.AccessSecret.IsPlain() {
  343. c.AccessSecret.SetAdditionalData(additionalData)
  344. err := c.AccessSecret.Encrypt()
  345. if err != nil {
  346. return util.NewI18nError(
  347. util.NewValidationError(fmt.Sprintf("could not encrypt s3 access secret: %v", err)),
  348. util.I18nErrorFsValidation,
  349. )
  350. }
  351. }
  352. return nil
  353. }
  354. func (c *S3FsConfig) checkPartSizeAndConcurrency() error {
  355. if c.UploadPartSize != 0 && (c.UploadPartSize < 5 || c.UploadPartSize > 5000) {
  356. return util.NewI18nError(
  357. errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)"),
  358. util.I18nErrorULPartSizeInvalid,
  359. )
  360. }
  361. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  362. return util.NewI18nError(
  363. fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency),
  364. util.I18nErrorULConcurrencyInvalid,
  365. )
  366. }
  367. if c.DownloadPartSize != 0 && (c.DownloadPartSize < 5 || c.DownloadPartSize > 5000) {
  368. return util.NewI18nError(
  369. errors.New("download_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)"),
  370. util.I18nErrorDLPartSizeInvalid,
  371. )
  372. }
  373. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  374. return util.NewI18nError(
  375. fmt.Errorf("invalid download concurrency: %v", c.DownloadConcurrency),
  376. util.I18nErrorDLConcurrencyInvalid,
  377. )
  378. }
  379. return nil
  380. }
  381. func (c *S3FsConfig) isSameResource(other S3FsConfig) bool {
  382. if c.Bucket != other.Bucket {
  383. return false
  384. }
  385. if c.Endpoint != other.Endpoint {
  386. return false
  387. }
  388. return c.Region == other.Region
  389. }
  390. // validate returns an error if the configuration is not valid
  391. func (c *S3FsConfig) validate() error {
  392. if c.AccessSecret == nil {
  393. c.AccessSecret = kms.NewEmptySecret()
  394. }
  395. if c.Bucket == "" {
  396. return util.NewI18nError(errors.New("bucket cannot be empty"), util.I18nErrorBucketRequired)
  397. }
  398. // the region may be embedded within the endpoint for some S3 compatible
  399. // object storage, for example B2
  400. if c.Endpoint == "" && c.Region == "" {
  401. return util.NewI18nError(errors.New("region cannot be empty"), util.I18nErrorRegionRequired)
  402. }
  403. if err := c.checkCredentials(); err != nil {
  404. return err
  405. }
  406. if c.KeyPrefix != "" {
  407. if strings.HasPrefix(c.KeyPrefix, "/") {
  408. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  409. }
  410. c.KeyPrefix = path.Clean(c.KeyPrefix)
  411. if !strings.HasSuffix(c.KeyPrefix, "/") {
  412. c.KeyPrefix += "/"
  413. }
  414. }
  415. c.StorageClass = strings.TrimSpace(c.StorageClass)
  416. c.ACL = strings.TrimSpace(c.ACL)
  417. return c.checkPartSizeAndConcurrency()
  418. }
  419. // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
  420. type GCSFsConfig struct {
  421. sdk.BaseGCSFsConfig
  422. Credentials *kms.Secret `json:"credentials,omitempty"`
  423. }
  424. // HideConfidentialData hides confidential data
  425. func (c *GCSFsConfig) HideConfidentialData() {
  426. if c.Credentials != nil {
  427. c.Credentials.Hide()
  428. }
  429. }
  430. // ValidateAndEncryptCredentials validates the configuration and encrypts credentials if they are in plain text
  431. func (c *GCSFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  432. if err := c.validate(); err != nil {
  433. var errI18n *util.I18nError
  434. errValidation := util.NewValidationError(fmt.Sprintf("could not validate GCS config: %v", err))
  435. if errors.As(err, &errI18n) {
  436. return util.NewI18nError(errValidation, errI18n.Message)
  437. }
  438. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  439. }
  440. if c.Credentials.IsPlain() {
  441. c.Credentials.SetAdditionalData(additionalData)
  442. err := c.Credentials.Encrypt()
  443. if err != nil {
  444. return util.NewI18nError(
  445. util.NewValidationError(fmt.Sprintf("could not encrypt GCS credentials: %v", err)),
  446. util.I18nErrorFsValidation,
  447. )
  448. }
  449. }
  450. return nil
  451. }
  452. func (c *GCSFsConfig) isEqual(other GCSFsConfig) bool {
  453. if c.Bucket != other.Bucket {
  454. return false
  455. }
  456. if c.KeyPrefix != other.KeyPrefix {
  457. return false
  458. }
  459. if c.AutomaticCredentials != other.AutomaticCredentials {
  460. return false
  461. }
  462. if c.StorageClass != other.StorageClass {
  463. return false
  464. }
  465. if c.ACL != other.ACL {
  466. return false
  467. }
  468. if c.UploadPartSize != other.UploadPartSize {
  469. return false
  470. }
  471. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  472. return false
  473. }
  474. if c.Credentials == nil {
  475. c.Credentials = kms.NewEmptySecret()
  476. }
  477. if other.Credentials == nil {
  478. other.Credentials = kms.NewEmptySecret()
  479. }
  480. return c.Credentials.IsEqual(other.Credentials)
  481. }
  482. func (c *GCSFsConfig) isSameResource(other GCSFsConfig) bool {
  483. return c.Bucket == other.Bucket
  484. }
  485. // validate returns an error if the configuration is not valid
  486. func (c *GCSFsConfig) validate() error {
  487. if c.Credentials == nil || c.AutomaticCredentials == 1 {
  488. c.Credentials = kms.NewEmptySecret()
  489. }
  490. if c.Bucket == "" {
  491. return util.NewI18nError(errors.New("bucket cannot be empty"), util.I18nErrorBucketRequired)
  492. }
  493. if c.KeyPrefix != "" {
  494. if strings.HasPrefix(c.KeyPrefix, "/") {
  495. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  496. }
  497. c.KeyPrefix = path.Clean(c.KeyPrefix)
  498. if !strings.HasSuffix(c.KeyPrefix, "/") {
  499. c.KeyPrefix += "/"
  500. }
  501. }
  502. if c.Credentials.IsEncrypted() && !c.Credentials.IsValid() {
  503. return errors.New("invalid encrypted credentials")
  504. }
  505. if c.AutomaticCredentials == 0 && !c.Credentials.IsValidInput() {
  506. return util.NewI18nError(errors.New("invalid credentials"), util.I18nErrorFsCredentialsRequired)
  507. }
  508. c.StorageClass = strings.TrimSpace(c.StorageClass)
  509. c.ACL = strings.TrimSpace(c.ACL)
  510. if c.UploadPartSize < 0 {
  511. c.UploadPartSize = 0
  512. }
  513. if c.UploadPartMaxTime < 0 {
  514. c.UploadPartMaxTime = 0
  515. }
  516. return nil
  517. }
  518. // AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
  519. type AzBlobFsConfig struct {
  520. sdk.BaseAzBlobFsConfig
  521. // Storage Account Key leave blank to use SAS URL.
  522. // The access key is stored encrypted based on the kms configuration
  523. AccountKey *kms.Secret `json:"account_key,omitempty"`
  524. // Shared access signature URL, leave blank if using account/key
  525. SASURL *kms.Secret `json:"sas_url,omitempty"`
  526. }
  527. // HideConfidentialData hides confidential data
  528. func (c *AzBlobFsConfig) HideConfidentialData() {
  529. if c.AccountKey != nil {
  530. c.AccountKey.Hide()
  531. }
  532. if c.SASURL != nil {
  533. c.SASURL.Hide()
  534. }
  535. }
  536. func (c *AzBlobFsConfig) isEqual(other AzBlobFsConfig) bool {
  537. if c.Container != other.Container {
  538. return false
  539. }
  540. if c.AccountName != other.AccountName {
  541. return false
  542. }
  543. if c.Endpoint != other.Endpoint {
  544. return false
  545. }
  546. if c.SASURL.IsEmpty() {
  547. c.SASURL = kms.NewEmptySecret()
  548. }
  549. if other.SASURL.IsEmpty() {
  550. other.SASURL = kms.NewEmptySecret()
  551. }
  552. if !c.SASURL.IsEqual(other.SASURL) {
  553. return false
  554. }
  555. if c.KeyPrefix != other.KeyPrefix {
  556. return false
  557. }
  558. if c.UploadPartSize != other.UploadPartSize {
  559. return false
  560. }
  561. if c.UploadConcurrency != other.UploadConcurrency {
  562. return false
  563. }
  564. if c.DownloadPartSize != other.DownloadPartSize {
  565. return false
  566. }
  567. if c.DownloadConcurrency != other.DownloadConcurrency {
  568. return false
  569. }
  570. if c.UseEmulator != other.UseEmulator {
  571. return false
  572. }
  573. if c.AccessTier != other.AccessTier {
  574. return false
  575. }
  576. return c.isSecretEqual(other)
  577. }
  578. func (c *AzBlobFsConfig) isSecretEqual(other AzBlobFsConfig) bool {
  579. if c.AccountKey == nil {
  580. c.AccountKey = kms.NewEmptySecret()
  581. }
  582. if other.AccountKey == nil {
  583. other.AccountKey = kms.NewEmptySecret()
  584. }
  585. return c.AccountKey.IsEqual(other.AccountKey)
  586. }
  587. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  588. func (c *AzBlobFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  589. if err := c.validate(); err != nil {
  590. var errI18n *util.I18nError
  591. errValidation := util.NewValidationError(fmt.Sprintf("could not validate Azure Blob config: %v", err))
  592. if errors.As(err, &errI18n) {
  593. return util.NewI18nError(errValidation, errI18n.Message)
  594. }
  595. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  596. }
  597. if c.AccountKey.IsPlain() {
  598. c.AccountKey.SetAdditionalData(additionalData)
  599. if err := c.AccountKey.Encrypt(); err != nil {
  600. return util.NewI18nError(
  601. util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob account key: %v", err)),
  602. util.I18nErrorFsValidation,
  603. )
  604. }
  605. }
  606. if c.SASURL.IsPlain() {
  607. c.SASURL.SetAdditionalData(additionalData)
  608. if err := c.SASURL.Encrypt(); err != nil {
  609. return util.NewI18nError(
  610. util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob SAS URL: %v", err)),
  611. util.I18nErrorFsValidation,
  612. )
  613. }
  614. }
  615. return nil
  616. }
  617. func (c *AzBlobFsConfig) checkCredentials() error {
  618. if c.SASURL.IsPlain() {
  619. _, err := url.Parse(c.SASURL.GetPayload())
  620. if err != nil {
  621. return util.NewI18nError(err, util.I18nErrorSASURLInvalid)
  622. }
  623. return nil
  624. }
  625. if c.SASURL.IsEncrypted() && !c.SASURL.IsValid() {
  626. return errors.New("invalid encrypted sas_url")
  627. }
  628. if !c.SASURL.IsEmpty() {
  629. return nil
  630. }
  631. if c.AccountName == "" || !c.AccountKey.IsValidInput() {
  632. return util.NewI18nError(errors.New("credentials cannot be empty or invalid"), util.I18nErrorAccountNameRequired)
  633. }
  634. if c.AccountKey.IsEncrypted() && !c.AccountKey.IsValid() {
  635. return errors.New("invalid encrypted account_key")
  636. }
  637. return nil
  638. }
  639. func (c *AzBlobFsConfig) checkPartSizeAndConcurrency() error {
  640. if c.UploadPartSize < 0 || c.UploadPartSize > 100 {
  641. return util.NewI18nError(
  642. fmt.Errorf("invalid upload part size: %v", c.UploadPartSize),
  643. util.I18nErrorULPartSizeInvalid,
  644. )
  645. }
  646. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  647. return util.NewI18nError(
  648. fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency),
  649. util.I18nErrorULConcurrencyInvalid,
  650. )
  651. }
  652. if c.DownloadPartSize < 0 || c.DownloadPartSize > 100 {
  653. return util.NewI18nError(
  654. fmt.Errorf("invalid download part size: %v", c.DownloadPartSize),
  655. util.I18nErrorDLPartSizeInvalid,
  656. )
  657. }
  658. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  659. return util.NewI18nError(
  660. fmt.Errorf("invalid upload concurrency: %v", c.DownloadConcurrency),
  661. util.I18nErrorDLConcurrencyInvalid,
  662. )
  663. }
  664. return nil
  665. }
  666. func (c *AzBlobFsConfig) tryDecrypt() error {
  667. if err := c.AccountKey.TryDecrypt(); err != nil {
  668. return fmt.Errorf("unable to decrypt account key: %w", err)
  669. }
  670. if err := c.SASURL.TryDecrypt(); err != nil {
  671. return fmt.Errorf("unable to decrypt SAS URL: %w", err)
  672. }
  673. return nil
  674. }
  675. func (c *AzBlobFsConfig) isSameResource(other AzBlobFsConfig) bool {
  676. if c.AccountName != other.AccountName {
  677. return false
  678. }
  679. if c.Endpoint != other.Endpoint {
  680. return false
  681. }
  682. return c.SASURL.GetPayload() == other.SASURL.GetPayload()
  683. }
  684. // validate returns an error if the configuration is not valid
  685. func (c *AzBlobFsConfig) validate() error {
  686. if c.AccountKey == nil {
  687. c.AccountKey = kms.NewEmptySecret()
  688. }
  689. if c.SASURL == nil {
  690. c.SASURL = kms.NewEmptySecret()
  691. }
  692. // container could be embedded within SAS URL we check this at runtime
  693. if c.SASURL.IsEmpty() && c.Container == "" {
  694. return util.NewI18nError(errors.New("container cannot be empty"), util.I18nErrorContainerRequired)
  695. }
  696. if err := c.checkCredentials(); err != nil {
  697. return err
  698. }
  699. if c.KeyPrefix != "" {
  700. if strings.HasPrefix(c.KeyPrefix, "/") {
  701. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  702. }
  703. c.KeyPrefix = path.Clean(c.KeyPrefix)
  704. if !strings.HasSuffix(c.KeyPrefix, "/") {
  705. c.KeyPrefix += "/"
  706. }
  707. }
  708. if err := c.checkPartSizeAndConcurrency(); err != nil {
  709. return err
  710. }
  711. if !util.Contains(validAzAccessTier, c.AccessTier) {
  712. return fmt.Errorf("invalid access tier %q, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))
  713. }
  714. return nil
  715. }
  716. // CryptFsConfig defines the configuration to store local files as encrypted
  717. type CryptFsConfig struct {
  718. sdk.OSFsConfig
  719. Passphrase *kms.Secret `json:"passphrase,omitempty"`
  720. }
  721. // HideConfidentialData hides confidential data
  722. func (c *CryptFsConfig) HideConfidentialData() {
  723. if c.Passphrase != nil {
  724. c.Passphrase.Hide()
  725. }
  726. }
  727. func (c *CryptFsConfig) isEqual(other CryptFsConfig) bool {
  728. if c.Passphrase == nil {
  729. c.Passphrase = kms.NewEmptySecret()
  730. }
  731. if other.Passphrase == nil {
  732. other.Passphrase = kms.NewEmptySecret()
  733. }
  734. return c.Passphrase.IsEqual(other.Passphrase)
  735. }
  736. // ValidateAndEncryptCredentials validates the configuration and encrypts the passphrase if it is in plain text
  737. func (c *CryptFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  738. if err := c.validate(); err != nil {
  739. var errI18n *util.I18nError
  740. errValidation := util.NewValidationError(fmt.Sprintf("could not validate crypt fs config: %v", err))
  741. if errors.As(err, &errI18n) {
  742. return util.NewI18nError(errValidation, errI18n.Message)
  743. }
  744. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  745. }
  746. if c.Passphrase.IsPlain() {
  747. c.Passphrase.SetAdditionalData(additionalData)
  748. if err := c.Passphrase.Encrypt(); err != nil {
  749. return util.NewI18nError(
  750. util.NewValidationError(fmt.Sprintf("could not encrypt Crypt fs passphrase: %v", err)),
  751. util.I18nErrorFsValidation,
  752. )
  753. }
  754. }
  755. return nil
  756. }
  757. func (c *CryptFsConfig) isSameResource(other CryptFsConfig) bool {
  758. return c.Passphrase.GetPayload() == other.Passphrase.GetPayload()
  759. }
  760. // validate returns an error if the configuration is not valid
  761. func (c *CryptFsConfig) validate() error {
  762. if c.Passphrase == nil || c.Passphrase.IsEmpty() {
  763. return util.NewI18nError(errors.New("invalid passphrase"), util.I18nErrorPassphraseRequired)
  764. }
  765. if !c.Passphrase.IsValidInput() {
  766. return util.NewI18nError(errors.New("passphrase cannot be empty or invalid"), util.I18nErrorPassphraseRequired)
  767. }
  768. if c.Passphrase.IsEncrypted() && !c.Passphrase.IsValid() {
  769. return errors.New("invalid encrypted passphrase")
  770. }
  771. return nil
  772. }
  773. // pipeWriter defines a wrapper for pipeat.PipeWriterAt.
  774. type pipeWriter struct {
  775. *pipeat.PipeWriterAt
  776. err error
  777. done chan bool
  778. }
  779. // NewPipeWriter initializes a new PipeWriter
  780. func NewPipeWriter(w *pipeat.PipeWriterAt) PipeWriter {
  781. return &pipeWriter{
  782. PipeWriterAt: w,
  783. err: nil,
  784. done: make(chan bool),
  785. }
  786. }
  787. // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
  788. func (p *pipeWriter) Close() error {
  789. p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null
  790. <-p.done
  791. return p.err
  792. }
  793. // Done unlocks other goroutines waiting on Close().
  794. // It must be called when the upload ends
  795. func (p *pipeWriter) Done(err error) {
  796. p.err = err
  797. p.done <- true
  798. }
  799. func newPipeWriterAtOffset(w *pipeat.PipeWriterAt, offset int64) PipeWriter {
  800. return &pipeWriterAtOffset{
  801. pipeWriter: &pipeWriter{
  802. PipeWriterAt: w,
  803. err: nil,
  804. done: make(chan bool),
  805. },
  806. offset: offset,
  807. writeOffset: offset,
  808. }
  809. }
  810. type pipeWriterAtOffset struct {
  811. *pipeWriter
  812. offset int64
  813. writeOffset int64
  814. }
  815. func (p *pipeWriterAtOffset) WriteAt(buf []byte, off int64) (int, error) {
  816. if off < p.offset {
  817. return 0, fmt.Errorf("invalid offset %d, minimum accepted %d", off, p.offset)
  818. }
  819. return p.pipeWriter.WriteAt(buf, off-p.offset)
  820. }
  821. func (p *pipeWriterAtOffset) Write(buf []byte) (int, error) {
  822. n, err := p.WriteAt(buf, p.writeOffset)
  823. p.writeOffset += int64(n)
  824. return n, err
  825. }
  826. // NewPipeReader initializes a new PipeReader
  827. func NewPipeReader(r *pipeat.PipeReaderAt) PipeReader {
  828. return &pipeReader{
  829. PipeReaderAt: r,
  830. }
  831. }
  832. // pipeReader defines a wrapper for pipeat.PipeReaderAt.
  833. type pipeReader struct {
  834. *pipeat.PipeReaderAt
  835. mu sync.RWMutex
  836. metadata map[string]string
  837. }
  838. func (p *pipeReader) setMetadata(value map[string]string) {
  839. p.mu.Lock()
  840. defer p.mu.Unlock()
  841. p.metadata = value
  842. }
  843. func (p *pipeReader) setMetadataFromPointerVal(value map[string]*string) {
  844. p.mu.Lock()
  845. defer p.mu.Unlock()
  846. if len(value) == 0 {
  847. p.metadata = nil
  848. return
  849. }
  850. p.metadata = map[string]string{}
  851. for k, v := range value {
  852. val := util.GetStringFromPointer(v)
  853. if val != "" {
  854. p.metadata[k] = val
  855. }
  856. }
  857. }
  858. // Metadata implements the Metadater interface
  859. func (p *pipeReader) Metadata() map[string]string {
  860. p.mu.RLock()
  861. defer p.mu.RUnlock()
  862. if len(p.metadata) == 0 {
  863. return nil
  864. }
  865. result := make(map[string]string)
  866. for k, v := range p.metadata {
  867. result[k] = v
  868. }
  869. return result
  870. }
  871. func isEqualityCheckModeValid(mode int) bool {
  872. return mode >= 0 || mode <= 1
  873. }
  874. // isDirectory checks if a path exists and is a directory
  875. func isDirectory(fs Fs, path string) (bool, error) {
  876. fileInfo, err := fs.Stat(path)
  877. if err != nil {
  878. return false, err
  879. }
  880. return fileInfo.IsDir(), err
  881. }
  882. // IsLocalOsFs returns true if fs is a local filesystem implementation
  883. func IsLocalOsFs(fs Fs) bool {
  884. return fs.Name() == osFsName
  885. }
  886. // IsCryptOsFs returns true if fs is an encrypted local filesystem implementation
  887. func IsCryptOsFs(fs Fs) bool {
  888. return fs.Name() == cryptFsName
  889. }
  890. // IsSFTPFs returns true if fs is an SFTP filesystem
  891. func IsSFTPFs(fs Fs) bool {
  892. return strings.HasPrefix(fs.Name(), sftpFsName)
  893. }
  894. // IsHTTPFs returns true if fs is an HTTP filesystem
  895. func IsHTTPFs(fs Fs) bool {
  896. return strings.HasPrefix(fs.Name(), httpFsName)
  897. }
  898. // IsBufferedLocalOrSFTPFs returns true if this is a buffered SFTP or local filesystem
  899. func IsBufferedLocalOrSFTPFs(fs Fs) bool {
  900. if osFs, ok := fs.(*OsFs); ok {
  901. return osFs.writeBufferSize > 0
  902. }
  903. if !IsSFTPFs(fs) {
  904. return false
  905. }
  906. return !fs.IsUploadResumeSupported()
  907. }
  908. // FsOpenReturnsFile returns true if fs.Open returns a *os.File handle
  909. func FsOpenReturnsFile(fs Fs) bool {
  910. if osFs, ok := fs.(*OsFs); ok {
  911. return osFs.readBufferSize == 0
  912. }
  913. if sftpFs, ok := fs.(*SFTPFs); ok {
  914. return sftpFs.config.BufferSize == 0
  915. }
  916. return false
  917. }
  918. // IsLocalOrSFTPFs returns true if fs is local or SFTP
  919. func IsLocalOrSFTPFs(fs Fs) bool {
  920. return IsLocalOsFs(fs) || IsSFTPFs(fs)
  921. }
  922. // HasTruncateSupport returns true if the fs supports truncate files
  923. func HasTruncateSupport(fs Fs) bool {
  924. return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
  925. }
  926. // IsRenameAtomic returns true if renaming a directory is supposed to be atomic
  927. func IsRenameAtomic(fs Fs) bool {
  928. if strings.HasPrefix(fs.Name(), s3fsName) {
  929. return false
  930. }
  931. if strings.HasPrefix(fs.Name(), gcsfsName) {
  932. return false
  933. }
  934. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  935. return false
  936. }
  937. return true
  938. }
  939. // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
  940. func HasImplicitAtomicUploads(fs Fs) bool {
  941. if strings.HasPrefix(fs.Name(), s3fsName) {
  942. return uploadMode&4 == 0
  943. }
  944. if strings.HasPrefix(fs.Name(), gcsfsName) {
  945. return uploadMode&8 == 0
  946. }
  947. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  948. return uploadMode&16 == 0
  949. }
  950. return false
  951. }
  952. // HasOpenRWSupport returns true if the fs can open a file
  953. // for reading and writing at the same time
  954. func HasOpenRWSupport(fs Fs) bool {
  955. if IsLocalOsFs(fs) {
  956. return true
  957. }
  958. if IsSFTPFs(fs) && fs.IsUploadResumeSupported() {
  959. return true
  960. }
  961. return false
  962. }
  963. // IsLocalOrCryptoFs returns true if fs is local or local encrypted
  964. func IsLocalOrCryptoFs(fs Fs) bool {
  965. return IsLocalOsFs(fs) || IsCryptOsFs(fs)
  966. }
  967. // SetPathPermissions calls fs.Chown.
  968. // It does nothing for local filesystem on windows
  969. func SetPathPermissions(fs Fs, path string, uid int, gid int) {
  970. if uid == -1 && gid == -1 {
  971. return
  972. }
  973. if IsLocalOsFs(fs) {
  974. if runtime.GOOS == "windows" {
  975. return
  976. }
  977. }
  978. if err := fs.Chown(path, uid, gid); err != nil {
  979. fsLog(fs, logger.LevelWarn, "error chowning path %v: %v", path, err)
  980. }
  981. }
  982. // IsUploadResumeSupported returns true if resuming uploads is supported
  983. func IsUploadResumeSupported(fs Fs, size int64) bool {
  984. if fs.IsUploadResumeSupported() {
  985. return true
  986. }
  987. return fs.IsConditionalUploadResumeSupported(size)
  988. }
  989. func getLastModified(metadata map[string]string) int64 {
  990. if val, ok := metadata[lastModifiedField]; ok {
  991. lastModified, err := strconv.ParseInt(val, 10, 64)
  992. if err == nil {
  993. return lastModified
  994. }
  995. }
  996. return 0
  997. }
  998. func getAzureLastModified(metadata map[string]*string) int64 {
  999. for k, v := range metadata {
  1000. if strings.ToLower(k) == lastModifiedField {
  1001. if val := util.GetStringFromPointer(v); val != "" {
  1002. lastModified, err := strconv.ParseInt(val, 10, 64)
  1003. if err == nil {
  1004. return lastModified
  1005. }
  1006. }
  1007. return 0
  1008. }
  1009. }
  1010. return 0
  1011. }
  1012. func validateOSFsConfig(config *sdk.OSFsConfig) error {
  1013. if config.ReadBufferSize < 0 || config.ReadBufferSize > 10 {
  1014. return fmt.Errorf("invalid read buffer size must be between 0 and 10 MB")
  1015. }
  1016. if config.WriteBufferSize < 0 || config.WriteBufferSize > 10 {
  1017. return fmt.Errorf("invalid write buffer size must be between 0 and 10 MB")
  1018. }
  1019. return nil
  1020. }
  1021. func doCopy(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
  1022. if buf == nil {
  1023. buf = make([]byte, 32768)
  1024. }
  1025. for {
  1026. nr, er := src.Read(buf)
  1027. if nr > 0 {
  1028. nw, ew := dst.Write(buf[0:nr])
  1029. if nw < 0 || nr < nw {
  1030. nw = 0
  1031. if ew == nil {
  1032. ew = errors.New("invalid write")
  1033. }
  1034. }
  1035. written += int64(nw)
  1036. if ew != nil {
  1037. err = ew
  1038. break
  1039. }
  1040. if nr != nw {
  1041. err = io.ErrShortWrite
  1042. break
  1043. }
  1044. }
  1045. if er != nil {
  1046. if er != io.EOF {
  1047. err = er
  1048. }
  1049. break
  1050. }
  1051. }
  1052. return written, err
  1053. }
  1054. func getMountPath(mountPath string) string {
  1055. if mountPath == "/" {
  1056. return ""
  1057. }
  1058. return mountPath
  1059. }
  1060. func getLocalTempDir() string {
  1061. if tempPath != "" {
  1062. return tempPath
  1063. }
  1064. return filepath.Clean(os.TempDir())
  1065. }
  1066. func doRecursiveRename(fs Fs, source, target string,
  1067. renameFn func(string, string, os.FileInfo, int) (int, int64, error),
  1068. recursion int,
  1069. ) (int, int64, error) {
  1070. var numFiles int
  1071. var filesSize int64
  1072. if recursion > util.MaxRecursion {
  1073. return numFiles, filesSize, util.ErrRecursionTooDeep
  1074. }
  1075. recursion++
  1076. lister, err := fs.ReadDir(source)
  1077. if err != nil {
  1078. return numFiles, filesSize, err
  1079. }
  1080. defer lister.Close()
  1081. for {
  1082. entries, err := lister.Next(ListerBatchSize)
  1083. finished := errors.Is(err, io.EOF)
  1084. if err != nil && !finished {
  1085. return numFiles, filesSize, err
  1086. }
  1087. for _, info := range entries {
  1088. sourceEntry := fs.Join(source, info.Name())
  1089. targetEntry := fs.Join(target, info.Name())
  1090. files, size, err := renameFn(sourceEntry, targetEntry, info, recursion)
  1091. if err != nil {
  1092. if fs.IsNotExist(err) {
  1093. fsLog(fs, logger.LevelInfo, "skipping rename for %q: %v", sourceEntry, err)
  1094. continue
  1095. }
  1096. return numFiles, filesSize, err
  1097. }
  1098. numFiles += files
  1099. filesSize += size
  1100. }
  1101. if finished {
  1102. return numFiles, filesSize, nil
  1103. }
  1104. }
  1105. }
  1106. func fsLog(fs Fs, level logger.LogLevel, format string, v ...any) {
  1107. logger.Log(level, fs.Name(), fs.ConnectionID(), format, v...)
  1108. }