vfs.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package vfs provides local and remote filesystems support
  15. package vfs
  16. import (
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/url"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "runtime"
  25. "strings"
  26. "time"
  27. "github.com/eikenb/pipeat"
  28. "github.com/pkg/sftp"
  29. "github.com/sftpgo/sdk"
  30. "github.com/sftpgo/sdk/plugin/metadata"
  31. "github.com/drakkan/sftpgo/v2/internal/kms"
  32. "github.com/drakkan/sftpgo/v2/internal/logger"
  33. "github.com/drakkan/sftpgo/v2/internal/plugin"
  34. "github.com/drakkan/sftpgo/v2/internal/util"
  35. )
  36. const (
  37. dirMimeType = "inode/directory"
  38. s3fsName = "S3Fs"
  39. gcsfsName = "GCSFs"
  40. azBlobFsName = "AzureBlobFs"
  41. )
  42. // Additional checks for files
  43. const (
  44. CheckParentDir = 1
  45. )
  46. var (
  47. validAzAccessTier = []string{"", "Archive", "Hot", "Cool"}
  48. // ErrStorageSizeUnavailable is returned if the storage backend does not support getting the size
  49. ErrStorageSizeUnavailable = errors.New("unable to get available size for this storage backend")
  50. // ErrVfsUnsupported defines the error for an unsupported VFS operation
  51. ErrVfsUnsupported = errors.New("not supported")
  52. tempPath string
  53. sftpFingerprints []string
  54. allowSelfConnections int
  55. renameMode int
  56. )
  57. // SetAllowSelfConnections sets the desired behaviour for self connections
  58. func SetAllowSelfConnections(value int) {
  59. allowSelfConnections = value
  60. }
  61. // SetTempPath sets the path for temporary files
  62. func SetTempPath(fsPath string) {
  63. tempPath = fsPath
  64. }
  65. // GetTempPath returns the path for temporary files
  66. func GetTempPath() string {
  67. return tempPath
  68. }
  69. // SetSFTPFingerprints sets the SFTP host key fingerprints
  70. func SetSFTPFingerprints(fp []string) {
  71. sftpFingerprints = fp
  72. }
  73. // SetRenameMode sets the rename mode
  74. func SetRenameMode(val int) {
  75. renameMode = val
  76. }
  77. // Fs defines the interface for filesystem backends
  78. type Fs interface {
  79. Name() string
  80. ConnectionID() string
  81. Stat(name string) (os.FileInfo, error)
  82. Lstat(name string) (os.FileInfo, error)
  83. Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error)
  84. Create(name string, flag, checks int) (File, *PipeWriter, func(), error)
  85. Rename(source, target string) (int, int64, error)
  86. Remove(name string, isDir bool) error
  87. Mkdir(name string) error
  88. Symlink(source, target string) error
  89. Chown(name string, uid int, gid int) error
  90. Chmod(name string, mode os.FileMode) error
  91. Chtimes(name string, atime, mtime time.Time, isUploading bool) error
  92. Truncate(name string, size int64) error
  93. ReadDir(dirname string) ([]os.FileInfo, error)
  94. Readlink(name string) (string, error)
  95. IsUploadResumeSupported() bool
  96. IsAtomicUploadSupported() bool
  97. CheckRootPath(username string, uid int, gid int) bool
  98. ResolvePath(virtualPath string) (string, error)
  99. IsNotExist(err error) bool
  100. IsPermission(err error) bool
  101. IsNotSupported(err error) bool
  102. ScanRootDirContents() (int, int64, error)
  103. GetDirSize(dirname string) (int, int64, error)
  104. GetAtomicUploadPath(name string) string
  105. GetRelativePath(name string) string
  106. Walk(root string, walkFn filepath.WalkFunc) error
  107. Join(elem ...string) string
  108. HasVirtualFolders() bool
  109. GetMimeType(name string) (string, error)
  110. GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error)
  111. CheckMetadata() error
  112. Close() error
  113. }
  114. // FsRealPather is a Fs that implements the RealPath method.
  115. type FsRealPather interface {
  116. Fs
  117. RealPath(p string) (string, error)
  118. }
  119. // fsMetadataChecker is a Fs that implements the getFileNamesInPrefix method.
  120. // This interface is used to abstract metadata consistency checks
  121. type fsMetadataChecker interface {
  122. Fs
  123. getFileNamesInPrefix(fsPrefix string) (map[string]bool, error)
  124. }
  125. // FsFileCopier is a Fs that implements the CopyFile method.
  126. type FsFileCopier interface {
  127. Fs
  128. CopyFile(source, target string, srcSize int64) error
  129. }
  130. // File defines an interface representing a SFTPGo file
  131. type File interface {
  132. io.Reader
  133. io.Writer
  134. io.Closer
  135. io.ReaderAt
  136. io.WriterAt
  137. io.Seeker
  138. Stat() (os.FileInfo, error)
  139. Name() string
  140. Truncate(size int64) error
  141. }
  142. // QuotaCheckResult defines the result for a quota check
  143. type QuotaCheckResult struct {
  144. HasSpace bool
  145. AllowedSize int64
  146. AllowedFiles int
  147. UsedSize int64
  148. UsedFiles int
  149. QuotaSize int64
  150. QuotaFiles int
  151. }
  152. // GetRemainingSize returns the remaining allowed size
  153. func (q *QuotaCheckResult) GetRemainingSize() int64 {
  154. if q.QuotaSize > 0 {
  155. return q.QuotaSize - q.UsedSize
  156. }
  157. return 0
  158. }
  159. // GetRemainingFiles returns the remaining allowed files
  160. func (q *QuotaCheckResult) GetRemainingFiles() int {
  161. if q.QuotaFiles > 0 {
  162. return q.QuotaFiles - q.UsedFiles
  163. }
  164. return 0
  165. }
  166. // S3FsConfig defines the configuration for S3 based filesystem
  167. type S3FsConfig struct {
  168. sdk.BaseS3FsConfig
  169. AccessSecret *kms.Secret `json:"access_secret,omitempty"`
  170. }
  171. // HideConfidentialData hides confidential data
  172. func (c *S3FsConfig) HideConfidentialData() {
  173. if c.AccessSecret != nil {
  174. c.AccessSecret.Hide()
  175. }
  176. }
  177. func (c *S3FsConfig) isEqual(other S3FsConfig) bool {
  178. if c.Bucket != other.Bucket {
  179. return false
  180. }
  181. if c.KeyPrefix != other.KeyPrefix {
  182. return false
  183. }
  184. if c.Region != other.Region {
  185. return false
  186. }
  187. if c.AccessKey != other.AccessKey {
  188. return false
  189. }
  190. if c.RoleARN != other.RoleARN {
  191. return false
  192. }
  193. if c.Endpoint != other.Endpoint {
  194. return false
  195. }
  196. if c.StorageClass != other.StorageClass {
  197. return false
  198. }
  199. if c.ACL != other.ACL {
  200. return false
  201. }
  202. if !c.areMultipartFieldsEqual(other) {
  203. return false
  204. }
  205. if c.ForcePathStyle != other.ForcePathStyle {
  206. return false
  207. }
  208. return c.isSecretEqual(other)
  209. }
  210. func (c *S3FsConfig) areMultipartFieldsEqual(other S3FsConfig) bool {
  211. if c.UploadPartSize != other.UploadPartSize {
  212. return false
  213. }
  214. if c.UploadConcurrency != other.UploadConcurrency {
  215. return false
  216. }
  217. if c.DownloadConcurrency != other.DownloadConcurrency {
  218. return false
  219. }
  220. if c.DownloadPartSize != other.DownloadPartSize {
  221. return false
  222. }
  223. if c.DownloadPartMaxTime != other.DownloadPartMaxTime {
  224. return false
  225. }
  226. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  227. return false
  228. }
  229. return true
  230. }
  231. func (c *S3FsConfig) isSecretEqual(other S3FsConfig) bool {
  232. if c.AccessSecret == nil {
  233. c.AccessSecret = kms.NewEmptySecret()
  234. }
  235. if other.AccessSecret == nil {
  236. other.AccessSecret = kms.NewEmptySecret()
  237. }
  238. return c.AccessSecret.IsEqual(other.AccessSecret)
  239. }
  240. func (c *S3FsConfig) checkCredentials() error {
  241. if c.AccessKey == "" && !c.AccessSecret.IsEmpty() {
  242. return errors.New("access_key cannot be empty with access_secret not empty")
  243. }
  244. if c.AccessSecret.IsEmpty() && c.AccessKey != "" {
  245. return errors.New("access_secret cannot be empty with access_key not empty")
  246. }
  247. if c.AccessSecret.IsEncrypted() && !c.AccessSecret.IsValid() {
  248. return errors.New("invalid encrypted access_secret")
  249. }
  250. if !c.AccessSecret.IsEmpty() && !c.AccessSecret.IsValidInput() {
  251. return errors.New("invalid access_secret")
  252. }
  253. return nil
  254. }
  255. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  256. func (c *S3FsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  257. if err := c.validate(); err != nil {
  258. return util.NewValidationError(fmt.Sprintf("could not validate s3config: %v", err))
  259. }
  260. if c.AccessSecret.IsPlain() {
  261. c.AccessSecret.SetAdditionalData(additionalData)
  262. err := c.AccessSecret.Encrypt()
  263. if err != nil {
  264. return util.NewValidationError(fmt.Sprintf("could not encrypt s3 access secret: %v", err))
  265. }
  266. }
  267. return nil
  268. }
  269. func (c *S3FsConfig) checkPartSizeAndConcurrency() error {
  270. if c.UploadPartSize != 0 && (c.UploadPartSize < 5 || c.UploadPartSize > 5000) {
  271. return errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
  272. }
  273. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  274. return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency)
  275. }
  276. if c.DownloadPartSize != 0 && (c.DownloadPartSize < 5 || c.DownloadPartSize > 5000) {
  277. return errors.New("download_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
  278. }
  279. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  280. return fmt.Errorf("invalid download concurrency: %v", c.DownloadConcurrency)
  281. }
  282. return nil
  283. }
  284. func (c *S3FsConfig) isSameResource(other S3FsConfig) bool {
  285. if c.Bucket != other.Bucket {
  286. return false
  287. }
  288. if c.Endpoint != other.Endpoint {
  289. return false
  290. }
  291. return c.Region == other.Region
  292. }
  293. // validate returns an error if the configuration is not valid
  294. func (c *S3FsConfig) validate() error {
  295. if c.AccessSecret == nil {
  296. c.AccessSecret = kms.NewEmptySecret()
  297. }
  298. if c.Bucket == "" {
  299. return errors.New("bucket cannot be empty")
  300. }
  301. // the region may be embedded within the endpoint for some S3 compatible
  302. // object storage, for example B2
  303. if c.Endpoint == "" && c.Region == "" {
  304. return errors.New("region cannot be empty")
  305. }
  306. if err := c.checkCredentials(); err != nil {
  307. return err
  308. }
  309. if c.KeyPrefix != "" {
  310. if strings.HasPrefix(c.KeyPrefix, "/") {
  311. return errors.New("key_prefix cannot start with /")
  312. }
  313. c.KeyPrefix = path.Clean(c.KeyPrefix)
  314. if !strings.HasSuffix(c.KeyPrefix, "/") {
  315. c.KeyPrefix += "/"
  316. }
  317. }
  318. c.StorageClass = strings.TrimSpace(c.StorageClass)
  319. c.ACL = strings.TrimSpace(c.ACL)
  320. return c.checkPartSizeAndConcurrency()
  321. }
  322. // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
  323. type GCSFsConfig struct {
  324. sdk.BaseGCSFsConfig
  325. Credentials *kms.Secret `json:"credentials,omitempty"`
  326. }
  327. // HideConfidentialData hides confidential data
  328. func (c *GCSFsConfig) HideConfidentialData() {
  329. if c.Credentials != nil {
  330. c.Credentials.Hide()
  331. }
  332. }
  333. // ValidateAndEncryptCredentials validates the configuration and encrypts credentials if they are in plain text
  334. func (c *GCSFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  335. if err := c.validate(); err != nil {
  336. return util.NewValidationError(fmt.Sprintf("could not validate GCS config: %v", err))
  337. }
  338. if c.Credentials.IsPlain() {
  339. c.Credentials.SetAdditionalData(additionalData)
  340. err := c.Credentials.Encrypt()
  341. if err != nil {
  342. return util.NewValidationError(fmt.Sprintf("could not encrypt GCS credentials: %v", err))
  343. }
  344. }
  345. return nil
  346. }
  347. func (c *GCSFsConfig) isEqual(other GCSFsConfig) bool {
  348. if c.Bucket != other.Bucket {
  349. return false
  350. }
  351. if c.KeyPrefix != other.KeyPrefix {
  352. return false
  353. }
  354. if c.AutomaticCredentials != other.AutomaticCredentials {
  355. return false
  356. }
  357. if c.StorageClass != other.StorageClass {
  358. return false
  359. }
  360. if c.ACL != other.ACL {
  361. return false
  362. }
  363. if c.UploadPartSize != other.UploadPartSize {
  364. return false
  365. }
  366. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  367. return false
  368. }
  369. if c.Credentials == nil {
  370. c.Credentials = kms.NewEmptySecret()
  371. }
  372. if other.Credentials == nil {
  373. other.Credentials = kms.NewEmptySecret()
  374. }
  375. return c.Credentials.IsEqual(other.Credentials)
  376. }
  377. func (c *GCSFsConfig) isSameResource(other GCSFsConfig) bool {
  378. return c.Bucket == other.Bucket
  379. }
  380. // validate returns an error if the configuration is not valid
  381. func (c *GCSFsConfig) validate() error {
  382. if c.Credentials == nil || c.AutomaticCredentials == 1 {
  383. c.Credentials = kms.NewEmptySecret()
  384. }
  385. if c.Bucket == "" {
  386. return errors.New("bucket cannot be empty")
  387. }
  388. if c.KeyPrefix != "" {
  389. if strings.HasPrefix(c.KeyPrefix, "/") {
  390. return errors.New("key_prefix cannot start with /")
  391. }
  392. c.KeyPrefix = path.Clean(c.KeyPrefix)
  393. if !strings.HasSuffix(c.KeyPrefix, "/") {
  394. c.KeyPrefix += "/"
  395. }
  396. }
  397. if c.Credentials.IsEncrypted() && !c.Credentials.IsValid() {
  398. return errors.New("invalid encrypted credentials")
  399. }
  400. if c.AutomaticCredentials == 0 && !c.Credentials.IsValidInput() {
  401. return errors.New("invalid credentials")
  402. }
  403. c.StorageClass = strings.TrimSpace(c.StorageClass)
  404. c.ACL = strings.TrimSpace(c.ACL)
  405. if c.UploadPartSize < 0 {
  406. c.UploadPartSize = 0
  407. }
  408. if c.UploadPartMaxTime < 0 {
  409. c.UploadPartMaxTime = 0
  410. }
  411. return nil
  412. }
  413. // AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
  414. type AzBlobFsConfig struct {
  415. sdk.BaseAzBlobFsConfig
  416. // Storage Account Key leave blank to use SAS URL.
  417. // The access key is stored encrypted based on the kms configuration
  418. AccountKey *kms.Secret `json:"account_key,omitempty"`
  419. // Shared access signature URL, leave blank if using account/key
  420. SASURL *kms.Secret `json:"sas_url,omitempty"`
  421. }
  422. // HideConfidentialData hides confidential data
  423. func (c *AzBlobFsConfig) HideConfidentialData() {
  424. if c.AccountKey != nil {
  425. c.AccountKey.Hide()
  426. }
  427. if c.SASURL != nil {
  428. c.SASURL.Hide()
  429. }
  430. }
  431. func (c *AzBlobFsConfig) isEqual(other AzBlobFsConfig) bool {
  432. if c.Container != other.Container {
  433. return false
  434. }
  435. if c.AccountName != other.AccountName {
  436. return false
  437. }
  438. if c.Endpoint != other.Endpoint {
  439. return false
  440. }
  441. if c.SASURL.IsEmpty() {
  442. c.SASURL = kms.NewEmptySecret()
  443. }
  444. if other.SASURL.IsEmpty() {
  445. other.SASURL = kms.NewEmptySecret()
  446. }
  447. if !c.SASURL.IsEqual(other.SASURL) {
  448. return false
  449. }
  450. if c.KeyPrefix != other.KeyPrefix {
  451. return false
  452. }
  453. if c.UploadPartSize != other.UploadPartSize {
  454. return false
  455. }
  456. if c.UploadConcurrency != other.UploadConcurrency {
  457. return false
  458. }
  459. if c.DownloadPartSize != other.DownloadPartSize {
  460. return false
  461. }
  462. if c.DownloadConcurrency != other.DownloadConcurrency {
  463. return false
  464. }
  465. if c.UseEmulator != other.UseEmulator {
  466. return false
  467. }
  468. if c.AccessTier != other.AccessTier {
  469. return false
  470. }
  471. return c.isSecretEqual(other)
  472. }
  473. func (c *AzBlobFsConfig) isSecretEqual(other AzBlobFsConfig) bool {
  474. if c.AccountKey == nil {
  475. c.AccountKey = kms.NewEmptySecret()
  476. }
  477. if other.AccountKey == nil {
  478. other.AccountKey = kms.NewEmptySecret()
  479. }
  480. return c.AccountKey.IsEqual(other.AccountKey)
  481. }
  482. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  483. func (c *AzBlobFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  484. if err := c.validate(); err != nil {
  485. return util.NewValidationError(fmt.Sprintf("could not validate Azure Blob config: %v", err))
  486. }
  487. if c.AccountKey.IsPlain() {
  488. c.AccountKey.SetAdditionalData(additionalData)
  489. if err := c.AccountKey.Encrypt(); err != nil {
  490. return util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob account key: %v", err))
  491. }
  492. }
  493. if c.SASURL.IsPlain() {
  494. c.SASURL.SetAdditionalData(additionalData)
  495. if err := c.SASURL.Encrypt(); err != nil {
  496. return util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob SAS URL: %v", err))
  497. }
  498. }
  499. return nil
  500. }
  501. func (c *AzBlobFsConfig) checkCredentials() error {
  502. if c.SASURL.IsPlain() {
  503. _, err := url.Parse(c.SASURL.GetPayload())
  504. return err
  505. }
  506. if c.SASURL.IsEncrypted() && !c.SASURL.IsValid() {
  507. return errors.New("invalid encrypted sas_url")
  508. }
  509. if !c.SASURL.IsEmpty() {
  510. return nil
  511. }
  512. if c.AccountName == "" || !c.AccountKey.IsValidInput() {
  513. return errors.New("credentials cannot be empty or invalid")
  514. }
  515. if c.AccountKey.IsEncrypted() && !c.AccountKey.IsValid() {
  516. return errors.New("invalid encrypted account_key")
  517. }
  518. return nil
  519. }
  520. func (c *AzBlobFsConfig) checkPartSizeAndConcurrency() error {
  521. if c.UploadPartSize < 0 || c.UploadPartSize > 100 {
  522. return fmt.Errorf("invalid upload part size: %v", c.UploadPartSize)
  523. }
  524. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  525. return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency)
  526. }
  527. if c.DownloadPartSize < 0 || c.DownloadPartSize > 100 {
  528. return fmt.Errorf("invalid download part size: %v", c.DownloadPartSize)
  529. }
  530. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  531. return fmt.Errorf("invalid upload concurrency: %v", c.DownloadConcurrency)
  532. }
  533. return nil
  534. }
  535. func (c *AzBlobFsConfig) tryDecrypt() error {
  536. if err := c.AccountKey.TryDecrypt(); err != nil {
  537. return fmt.Errorf("unable to decrypt account key: %w", err)
  538. }
  539. if err := c.SASURL.TryDecrypt(); err != nil {
  540. return fmt.Errorf("unable to decrypt SAS URL: %w", err)
  541. }
  542. return nil
  543. }
  544. func (c *AzBlobFsConfig) isSameResource(other AzBlobFsConfig) bool {
  545. if c.AccountName != other.AccountName {
  546. return false
  547. }
  548. if c.Endpoint != other.Endpoint {
  549. return false
  550. }
  551. return c.SASURL.GetPayload() == other.SASURL.GetPayload()
  552. }
  553. // validate returns an error if the configuration is not valid
  554. func (c *AzBlobFsConfig) validate() error {
  555. if c.AccountKey == nil {
  556. c.AccountKey = kms.NewEmptySecret()
  557. }
  558. if c.SASURL == nil {
  559. c.SASURL = kms.NewEmptySecret()
  560. }
  561. // container could be embedded within SAS URL we check this at runtime
  562. if c.SASURL.IsEmpty() && c.Container == "" {
  563. return errors.New("container cannot be empty")
  564. }
  565. if err := c.checkCredentials(); err != nil {
  566. return err
  567. }
  568. if c.KeyPrefix != "" {
  569. if strings.HasPrefix(c.KeyPrefix, "/") {
  570. return errors.New("key_prefix cannot start with /")
  571. }
  572. c.KeyPrefix = path.Clean(c.KeyPrefix)
  573. if !strings.HasSuffix(c.KeyPrefix, "/") {
  574. c.KeyPrefix += "/"
  575. }
  576. }
  577. if err := c.checkPartSizeAndConcurrency(); err != nil {
  578. return err
  579. }
  580. if !util.Contains(validAzAccessTier, c.AccessTier) {
  581. return fmt.Errorf("invalid access tier %q, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))
  582. }
  583. return nil
  584. }
  585. // CryptFsConfig defines the configuration to store local files as encrypted
  586. type CryptFsConfig struct {
  587. sdk.OSFsConfig
  588. Passphrase *kms.Secret `json:"passphrase,omitempty"`
  589. }
  590. // HideConfidentialData hides confidential data
  591. func (c *CryptFsConfig) HideConfidentialData() {
  592. if c.Passphrase != nil {
  593. c.Passphrase.Hide()
  594. }
  595. }
  596. func (c *CryptFsConfig) isEqual(other CryptFsConfig) bool {
  597. if c.Passphrase == nil {
  598. c.Passphrase = kms.NewEmptySecret()
  599. }
  600. if other.Passphrase == nil {
  601. other.Passphrase = kms.NewEmptySecret()
  602. }
  603. return c.Passphrase.IsEqual(other.Passphrase)
  604. }
  605. // ValidateAndEncryptCredentials validates the configuration and encrypts the passphrase if it is in plain text
  606. func (c *CryptFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  607. if err := c.validate(); err != nil {
  608. return util.NewValidationError(fmt.Sprintf("could not validate Crypt fs config: %v", err))
  609. }
  610. if c.Passphrase.IsPlain() {
  611. c.Passphrase.SetAdditionalData(additionalData)
  612. if err := c.Passphrase.Encrypt(); err != nil {
  613. return util.NewValidationError(fmt.Sprintf("could not encrypt Crypt fs passphrase: %v", err))
  614. }
  615. }
  616. return nil
  617. }
  618. func (c *CryptFsConfig) isSameResource(other CryptFsConfig) bool {
  619. return c.Passphrase.GetPayload() == other.Passphrase.GetPayload()
  620. }
  621. // validate returns an error if the configuration is not valid
  622. func (c *CryptFsConfig) validate() error {
  623. if c.Passphrase == nil || c.Passphrase.IsEmpty() {
  624. return errors.New("invalid passphrase")
  625. }
  626. if !c.Passphrase.IsValidInput() {
  627. return errors.New("passphrase cannot be empty or invalid")
  628. }
  629. if c.Passphrase.IsEncrypted() && !c.Passphrase.IsValid() {
  630. return errors.New("invalid encrypted passphrase")
  631. }
  632. return nil
  633. }
  634. // PipeWriter defines a wrapper for pipeat.PipeWriterAt.
  635. type PipeWriter struct {
  636. writer *pipeat.PipeWriterAt
  637. err error
  638. done chan bool
  639. }
  640. // NewPipeWriter initializes a new PipeWriter
  641. func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter {
  642. return &PipeWriter{
  643. writer: w,
  644. err: nil,
  645. done: make(chan bool),
  646. }
  647. }
  648. // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
  649. func (p *PipeWriter) Close() error {
  650. p.writer.Close() //nolint:errcheck // the returned error is always null
  651. <-p.done
  652. return p.err
  653. }
  654. // Done unlocks other goroutines waiting on Close().
  655. // It must be called when the upload ends
  656. func (p *PipeWriter) Done(err error) {
  657. p.err = err
  658. p.done <- true
  659. }
  660. // WriteAt is a wrapper for pipeat WriteAt
  661. func (p *PipeWriter) WriteAt(data []byte, off int64) (int, error) {
  662. return p.writer.WriteAt(data, off)
  663. }
  664. // Write is a wrapper for pipeat Write
  665. func (p *PipeWriter) Write(data []byte) (int, error) {
  666. return p.writer.Write(data)
  667. }
  668. func isEqualityCheckModeValid(mode int) bool {
  669. return mode >= 0 || mode <= 1
  670. }
  671. // isDirectory checks if a path exists and is a directory
  672. func isDirectory(fs Fs, path string) (bool, error) {
  673. fileInfo, err := fs.Stat(path)
  674. if err != nil {
  675. return false, err
  676. }
  677. return fileInfo.IsDir(), err
  678. }
  679. // IsLocalOsFs returns true if fs is a local filesystem implementation
  680. func IsLocalOsFs(fs Fs) bool {
  681. return fs.Name() == osFsName
  682. }
  683. // IsCryptOsFs returns true if fs is an encrypted local filesystem implementation
  684. func IsCryptOsFs(fs Fs) bool {
  685. return fs.Name() == cryptFsName
  686. }
  687. // IsSFTPFs returns true if fs is an SFTP filesystem
  688. func IsSFTPFs(fs Fs) bool {
  689. return strings.HasPrefix(fs.Name(), sftpFsName)
  690. }
  691. // IsHTTPFs returns true if fs is an HTTP filesystem
  692. func IsHTTPFs(fs Fs) bool {
  693. return strings.HasPrefix(fs.Name(), httpFsName)
  694. }
  695. // IsBufferedLocalOrSFTPFs returns true if this is a buffered SFTP or local filesystem
  696. func IsBufferedLocalOrSFTPFs(fs Fs) bool {
  697. if osFs, ok := fs.(*OsFs); ok {
  698. return osFs.writeBufferSize > 0
  699. }
  700. if !IsSFTPFs(fs) {
  701. return false
  702. }
  703. return !fs.IsUploadResumeSupported()
  704. }
  705. // FsOpenReturnsFile returns true if fs.Open returns a *os.File handle
  706. func FsOpenReturnsFile(fs Fs) bool {
  707. if osFs, ok := fs.(*OsFs); ok {
  708. return osFs.readBufferSize == 0
  709. }
  710. if sftpFs, ok := fs.(*SFTPFs); ok {
  711. return sftpFs.config.BufferSize == 0
  712. }
  713. return false
  714. }
  715. // IsLocalOrSFTPFs returns true if fs is local or SFTP
  716. func IsLocalOrSFTPFs(fs Fs) bool {
  717. return IsLocalOsFs(fs) || IsSFTPFs(fs)
  718. }
  719. // HasTruncateSupport returns true if the fs supports truncate files
  720. func HasTruncateSupport(fs Fs) bool {
  721. return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
  722. }
  723. // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
  724. func HasImplicitAtomicUploads(fs Fs) bool {
  725. if strings.HasPrefix(fs.Name(), s3fsName) {
  726. return true
  727. }
  728. if strings.HasPrefix(fs.Name(), gcsfsName) {
  729. return true
  730. }
  731. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  732. return true
  733. }
  734. return false
  735. }
  736. // HasOpenRWSupport returns true if the fs can open a file
  737. // for reading and writing at the same time
  738. func HasOpenRWSupport(fs Fs) bool {
  739. if IsLocalOsFs(fs) {
  740. return true
  741. }
  742. if IsSFTPFs(fs) && fs.IsUploadResumeSupported() {
  743. return true
  744. }
  745. return false
  746. }
  747. // IsLocalOrCryptoFs returns true if fs is local or local encrypted
  748. func IsLocalOrCryptoFs(fs Fs) bool {
  749. return IsLocalOsFs(fs) || IsCryptOsFs(fs)
  750. }
  751. // SetPathPermissions calls fs.Chown.
  752. // It does nothing for local filesystem on windows
  753. func SetPathPermissions(fs Fs, path string, uid int, gid int) {
  754. if uid == -1 && gid == -1 {
  755. return
  756. }
  757. if IsLocalOsFs(fs) {
  758. if runtime.GOOS == "windows" {
  759. return
  760. }
  761. }
  762. if err := fs.Chown(path, uid, gid); err != nil {
  763. fsLog(fs, logger.LevelWarn, "error chowning path %v: %v", path, err)
  764. }
  765. }
  766. func updateFileInfoModTime(storageID, objectPath string, info *FileInfo) (*FileInfo, error) {
  767. if !plugin.Handler.HasMetadater() {
  768. return info, nil
  769. }
  770. if info.IsDir() {
  771. return info, nil
  772. }
  773. mTime, err := plugin.Handler.GetModificationTime(storageID, ensureAbsPath(objectPath), info.IsDir())
  774. if errors.Is(err, metadata.ErrNoSuchObject) {
  775. return info, nil
  776. }
  777. if err != nil {
  778. return info, err
  779. }
  780. info.modTime = util.GetTimeFromMsecSinceEpoch(mTime)
  781. return info, nil
  782. }
  783. func getFolderModTimes(storageID, dirName string) (map[string]int64, error) {
  784. var err error
  785. modTimes := make(map[string]int64)
  786. if plugin.Handler.HasMetadater() {
  787. modTimes, err = plugin.Handler.GetModificationTimes(storageID, ensureAbsPath(dirName))
  788. if err != nil && !errors.Is(err, metadata.ErrNoSuchObject) {
  789. return modTimes, err
  790. }
  791. }
  792. return modTimes, nil
  793. }
  794. func ensureAbsPath(name string) string {
  795. if path.IsAbs(name) {
  796. return name
  797. }
  798. return path.Join("/", name)
  799. }
  800. func fsMetadataCheck(fs fsMetadataChecker, storageID, keyPrefix string) error {
  801. if !plugin.Handler.HasMetadater() {
  802. return nil
  803. }
  804. limit := 100
  805. from := ""
  806. for {
  807. metadataFolders, err := plugin.Handler.GetMetadataFolders(storageID, from, limit)
  808. if err != nil {
  809. fsLog(fs, logger.LevelError, "unable to get folders: %v", err)
  810. return err
  811. }
  812. for _, folder := range metadataFolders {
  813. from = folder
  814. fsPrefix := folder
  815. if !strings.HasSuffix(folder, "/") {
  816. fsPrefix += "/"
  817. }
  818. if keyPrefix != "" {
  819. if !strings.HasPrefix(fsPrefix, "/"+keyPrefix) {
  820. fsLog(fs, logger.LevelDebug, "skip metadata check for folder %q outside prefix %q",
  821. folder, keyPrefix)
  822. continue
  823. }
  824. }
  825. fsLog(fs, logger.LevelDebug, "check metadata for folder %q", folder)
  826. metadataValues, err := plugin.Handler.GetModificationTimes(storageID, folder)
  827. if err != nil {
  828. fsLog(fs, logger.LevelError, "unable to get modification times for folder %q: %v", folder, err)
  829. return err
  830. }
  831. if len(metadataValues) == 0 {
  832. fsLog(fs, logger.LevelDebug, "no metadata for folder %q", folder)
  833. continue
  834. }
  835. fileNames, err := fs.getFileNamesInPrefix(fsPrefix)
  836. if err != nil {
  837. fsLog(fs, logger.LevelError, "unable to get content for prefix %q: %v", fsPrefix, err)
  838. return err
  839. }
  840. // now check if we have metadata for a missing object
  841. for k := range metadataValues {
  842. if _, ok := fileNames[k]; !ok {
  843. filePath := ensureAbsPath(path.Join(folder, k))
  844. if err = plugin.Handler.RemoveMetadata(storageID, filePath); err != nil {
  845. fsLog(fs, logger.LevelError, "unable to remove metadata for missing file %q: %v", filePath, err)
  846. } else {
  847. fsLog(fs, logger.LevelDebug, "metadata removed for missing file %q", filePath)
  848. }
  849. }
  850. }
  851. }
  852. if len(metadataFolders) < limit {
  853. return nil
  854. }
  855. }
  856. }
  857. func validateOSFsConfig(config *sdk.OSFsConfig) error {
  858. if config.ReadBufferSize < 0 || config.ReadBufferSize > 10 {
  859. return fmt.Errorf("invalid read buffer size must be between 0 and 10 MB")
  860. }
  861. if config.WriteBufferSize < 0 || config.WriteBufferSize > 10 {
  862. return fmt.Errorf("invalid write buffer size must be between 0 and 10 MB")
  863. }
  864. return nil
  865. }
  866. func doCopy(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
  867. if buf == nil {
  868. buf = make([]byte, 32768)
  869. }
  870. for {
  871. nr, er := src.Read(buf)
  872. if nr > 0 {
  873. nw, ew := dst.Write(buf[0:nr])
  874. if nw < 0 || nr < nw {
  875. nw = 0
  876. if ew == nil {
  877. ew = errors.New("invalid write")
  878. }
  879. }
  880. written += int64(nw)
  881. if ew != nil {
  882. err = ew
  883. break
  884. }
  885. if nr != nw {
  886. err = io.ErrShortWrite
  887. break
  888. }
  889. }
  890. if er != nil {
  891. if er != io.EOF {
  892. err = er
  893. }
  894. break
  895. }
  896. }
  897. return written, err
  898. }
  899. func getMountPath(mountPath string) string {
  900. if mountPath == "/" {
  901. return ""
  902. }
  903. return mountPath
  904. }
  905. func fsLog(fs Fs, level logger.LogLevel, format string, v ...any) {
  906. logger.Log(level, fs.Name(), fs.ConnectionID(), format, v...)
  907. }