vfs.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package vfs provides local and remote filesystems support
  15. package vfs
  16. import (
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/url"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "runtime"
  25. "strings"
  26. "time"
  27. "github.com/eikenb/pipeat"
  28. "github.com/pkg/sftp"
  29. "github.com/sftpgo/sdk"
  30. "github.com/sftpgo/sdk/plugin/metadata"
  31. "github.com/drakkan/sftpgo/v2/internal/kms"
  32. "github.com/drakkan/sftpgo/v2/internal/logger"
  33. "github.com/drakkan/sftpgo/v2/internal/plugin"
  34. "github.com/drakkan/sftpgo/v2/internal/util"
  35. )
  36. const (
  37. dirMimeType = "inode/directory"
  38. s3fsName = "S3Fs"
  39. gcsfsName = "GCSFs"
  40. azBlobFsName = "AzureBlobFs"
  41. )
  42. // Additional checks for files
  43. const (
  44. CheckParentDir = 1
  45. )
  46. var (
  47. validAzAccessTier = []string{"", "Archive", "Hot", "Cool"}
  48. // ErrStorageSizeUnavailable is returned if the storage backend does not support getting the size
  49. ErrStorageSizeUnavailable = errors.New("unable to get available size for this storage backend")
  50. // ErrVfsUnsupported defines the error for an unsupported VFS operation
  51. ErrVfsUnsupported = errors.New("not supported")
  52. tempPath string
  53. sftpFingerprints []string
  54. allowSelfConnections int
  55. )
  56. // SetAllowSelfConnections sets the desired behaviour for self connections
  57. func SetAllowSelfConnections(value int) {
  58. allowSelfConnections = value
  59. }
  60. // SetTempPath sets the path for temporary files
  61. func SetTempPath(fsPath string) {
  62. tempPath = fsPath
  63. }
  64. // GetTempPath returns the path for temporary files
  65. func GetTempPath() string {
  66. return tempPath
  67. }
  68. // SetSFTPFingerprints sets the SFTP host key fingerprints
  69. func SetSFTPFingerprints(fp []string) {
  70. sftpFingerprints = fp
  71. }
  72. // Fs defines the interface for filesystem backends
  73. type Fs interface {
  74. Name() string
  75. ConnectionID() string
  76. Stat(name string) (os.FileInfo, error)
  77. Lstat(name string) (os.FileInfo, error)
  78. Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error)
  79. Create(name string, flag, checks int) (File, *PipeWriter, func(), error)
  80. Rename(source, target string) error
  81. Remove(name string, isDir bool) error
  82. Mkdir(name string) error
  83. Symlink(source, target string) error
  84. Chown(name string, uid int, gid int) error
  85. Chmod(name string, mode os.FileMode) error
  86. Chtimes(name string, atime, mtime time.Time, isUploading bool) error
  87. Truncate(name string, size int64) error
  88. ReadDir(dirname string) ([]os.FileInfo, error)
  89. Readlink(name string) (string, error)
  90. IsUploadResumeSupported() bool
  91. IsAtomicUploadSupported() bool
  92. CheckRootPath(username string, uid int, gid int) bool
  93. ResolvePath(virtualPath string) (string, error)
  94. IsNotExist(err error) bool
  95. IsPermission(err error) bool
  96. IsNotSupported(err error) bool
  97. ScanRootDirContents() (int, int64, error)
  98. GetDirSize(dirname string) (int, int64, error)
  99. GetAtomicUploadPath(name string) string
  100. GetRelativePath(name string) string
  101. Walk(root string, walkFn filepath.WalkFunc) error
  102. Join(elem ...string) string
  103. HasVirtualFolders() bool
  104. GetMimeType(name string) (string, error)
  105. GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error)
  106. CheckMetadata() error
  107. Close() error
  108. }
  109. // FsRealPather is a Fs that implements the RealPath method.
  110. type FsRealPather interface {
  111. Fs
  112. RealPath(p string) (string, error)
  113. }
  114. // fsMetadataChecker is a Fs that implements the getFileNamesInPrefix method.
  115. // This interface is used to abstract metadata consistency checks
  116. type fsMetadataChecker interface {
  117. Fs
  118. getFileNamesInPrefix(fsPrefix string) (map[string]bool, error)
  119. }
  120. // File defines an interface representing a SFTPGo file
  121. type File interface {
  122. io.Reader
  123. io.Writer
  124. io.Closer
  125. io.ReaderAt
  126. io.WriterAt
  127. io.Seeker
  128. Stat() (os.FileInfo, error)
  129. Name() string
  130. Truncate(size int64) error
  131. }
  132. // QuotaCheckResult defines the result for a quota check
  133. type QuotaCheckResult struct {
  134. HasSpace bool
  135. AllowedSize int64
  136. AllowedFiles int
  137. UsedSize int64
  138. UsedFiles int
  139. QuotaSize int64
  140. QuotaFiles int
  141. }
  142. // GetRemainingSize returns the remaining allowed size
  143. func (q *QuotaCheckResult) GetRemainingSize() int64 {
  144. if q.QuotaSize > 0 {
  145. return q.QuotaSize - q.UsedSize
  146. }
  147. return 0
  148. }
  149. // GetRemainingFiles returns the remaining allowed files
  150. func (q *QuotaCheckResult) GetRemainingFiles() int {
  151. if q.QuotaFiles > 0 {
  152. return q.QuotaFiles - q.UsedFiles
  153. }
  154. return 0
  155. }
  156. // S3FsConfig defines the configuration for S3 based filesystem
  157. type S3FsConfig struct {
  158. sdk.BaseS3FsConfig
  159. AccessSecret *kms.Secret `json:"access_secret,omitempty"`
  160. }
  161. // HideConfidentialData hides confidential data
  162. func (c *S3FsConfig) HideConfidentialData() {
  163. if c.AccessSecret != nil {
  164. c.AccessSecret.Hide()
  165. }
  166. }
  167. func (c *S3FsConfig) isEqual(other S3FsConfig) bool {
  168. if c.Bucket != other.Bucket {
  169. return false
  170. }
  171. if c.KeyPrefix != other.KeyPrefix {
  172. return false
  173. }
  174. if c.Region != other.Region {
  175. return false
  176. }
  177. if c.AccessKey != other.AccessKey {
  178. return false
  179. }
  180. if c.RoleARN != other.RoleARN {
  181. return false
  182. }
  183. if c.Endpoint != other.Endpoint {
  184. return false
  185. }
  186. if c.StorageClass != other.StorageClass {
  187. return false
  188. }
  189. if c.ACL != other.ACL {
  190. return false
  191. }
  192. if !c.areMultipartFieldsEqual(other) {
  193. return false
  194. }
  195. if c.ForcePathStyle != other.ForcePathStyle {
  196. return false
  197. }
  198. return c.isSecretEqual(other)
  199. }
  200. func (c *S3FsConfig) areMultipartFieldsEqual(other S3FsConfig) bool {
  201. if c.UploadPartSize != other.UploadPartSize {
  202. return false
  203. }
  204. if c.UploadConcurrency != other.UploadConcurrency {
  205. return false
  206. }
  207. if c.DownloadConcurrency != other.DownloadConcurrency {
  208. return false
  209. }
  210. if c.DownloadPartSize != other.DownloadPartSize {
  211. return false
  212. }
  213. if c.DownloadPartMaxTime != other.DownloadPartMaxTime {
  214. return false
  215. }
  216. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  217. return false
  218. }
  219. return true
  220. }
  221. func (c *S3FsConfig) isSecretEqual(other S3FsConfig) bool {
  222. if c.AccessSecret == nil {
  223. c.AccessSecret = kms.NewEmptySecret()
  224. }
  225. if other.AccessSecret == nil {
  226. other.AccessSecret = kms.NewEmptySecret()
  227. }
  228. return c.AccessSecret.IsEqual(other.AccessSecret)
  229. }
  230. func (c *S3FsConfig) checkCredentials() error {
  231. if c.AccessKey == "" && !c.AccessSecret.IsEmpty() {
  232. return errors.New("access_key cannot be empty with access_secret not empty")
  233. }
  234. if c.AccessSecret.IsEmpty() && c.AccessKey != "" {
  235. return errors.New("access_secret cannot be empty with access_key not empty")
  236. }
  237. if c.AccessSecret.IsEncrypted() && !c.AccessSecret.IsValid() {
  238. return errors.New("invalid encrypted access_secret")
  239. }
  240. if !c.AccessSecret.IsEmpty() && !c.AccessSecret.IsValidInput() {
  241. return errors.New("invalid access_secret")
  242. }
  243. return nil
  244. }
  245. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  246. func (c *S3FsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  247. if err := c.validate(); err != nil {
  248. return util.NewValidationError(fmt.Sprintf("could not validate s3config: %v", err))
  249. }
  250. if c.AccessSecret.IsPlain() {
  251. c.AccessSecret.SetAdditionalData(additionalData)
  252. err := c.AccessSecret.Encrypt()
  253. if err != nil {
  254. return util.NewValidationError(fmt.Sprintf("could not encrypt s3 access secret: %v", err))
  255. }
  256. }
  257. return nil
  258. }
  259. func (c *S3FsConfig) checkPartSizeAndConcurrency() error {
  260. if c.UploadPartSize != 0 && (c.UploadPartSize < 5 || c.UploadPartSize > 5000) {
  261. return errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
  262. }
  263. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  264. return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency)
  265. }
  266. if c.DownloadPartSize != 0 && (c.DownloadPartSize < 5 || c.DownloadPartSize > 5000) {
  267. return errors.New("download_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
  268. }
  269. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  270. return fmt.Errorf("invalid download concurrency: %v", c.DownloadConcurrency)
  271. }
  272. return nil
  273. }
  274. func (c *S3FsConfig) isSameResource(other S3FsConfig) bool {
  275. if c.Bucket != other.Bucket {
  276. return false
  277. }
  278. if c.Endpoint != other.Endpoint {
  279. return false
  280. }
  281. return c.Region == other.Region
  282. }
  283. // validate returns an error if the configuration is not valid
  284. func (c *S3FsConfig) validate() error {
  285. if c.AccessSecret == nil {
  286. c.AccessSecret = kms.NewEmptySecret()
  287. }
  288. if c.Bucket == "" {
  289. return errors.New("bucket cannot be empty")
  290. }
  291. // the region may be embedded within the endpoint for some S3 compatible
  292. // object storage, for example B2
  293. if c.Endpoint == "" && c.Region == "" {
  294. return errors.New("region cannot be empty")
  295. }
  296. if err := c.checkCredentials(); err != nil {
  297. return err
  298. }
  299. if c.KeyPrefix != "" {
  300. if strings.HasPrefix(c.KeyPrefix, "/") {
  301. return errors.New("key_prefix cannot start with /")
  302. }
  303. c.KeyPrefix = path.Clean(c.KeyPrefix)
  304. if !strings.HasSuffix(c.KeyPrefix, "/") {
  305. c.KeyPrefix += "/"
  306. }
  307. }
  308. c.StorageClass = strings.TrimSpace(c.StorageClass)
  309. c.ACL = strings.TrimSpace(c.ACL)
  310. return c.checkPartSizeAndConcurrency()
  311. }
  312. // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
  313. type GCSFsConfig struct {
  314. sdk.BaseGCSFsConfig
  315. Credentials *kms.Secret `json:"credentials,omitempty"`
  316. }
  317. // HideConfidentialData hides confidential data
  318. func (c *GCSFsConfig) HideConfidentialData() {
  319. if c.Credentials != nil {
  320. c.Credentials.Hide()
  321. }
  322. }
  323. // ValidateAndEncryptCredentials validates the configuration and encrypts credentials if they are in plain text
  324. func (c *GCSFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  325. if err := c.validate(); err != nil {
  326. return util.NewValidationError(fmt.Sprintf("could not validate GCS config: %v", err))
  327. }
  328. if c.Credentials.IsPlain() {
  329. c.Credentials.SetAdditionalData(additionalData)
  330. err := c.Credentials.Encrypt()
  331. if err != nil {
  332. return util.NewValidationError(fmt.Sprintf("could not encrypt GCS credentials: %v", err))
  333. }
  334. }
  335. return nil
  336. }
  337. func (c *GCSFsConfig) isEqual(other GCSFsConfig) bool {
  338. if c.Bucket != other.Bucket {
  339. return false
  340. }
  341. if c.KeyPrefix != other.KeyPrefix {
  342. return false
  343. }
  344. if c.AutomaticCredentials != other.AutomaticCredentials {
  345. return false
  346. }
  347. if c.StorageClass != other.StorageClass {
  348. return false
  349. }
  350. if c.ACL != other.ACL {
  351. return false
  352. }
  353. if c.Credentials == nil {
  354. c.Credentials = kms.NewEmptySecret()
  355. }
  356. if other.Credentials == nil {
  357. other.Credentials = kms.NewEmptySecret()
  358. }
  359. return c.Credentials.IsEqual(other.Credentials)
  360. }
  361. func (c *GCSFsConfig) isSameResource(other GCSFsConfig) bool {
  362. return c.Bucket == other.Bucket
  363. }
  364. // validate returns an error if the configuration is not valid
  365. func (c *GCSFsConfig) validate() error {
  366. if c.Credentials == nil || c.AutomaticCredentials == 1 {
  367. c.Credentials = kms.NewEmptySecret()
  368. }
  369. if c.Bucket == "" {
  370. return errors.New("bucket cannot be empty")
  371. }
  372. if c.KeyPrefix != "" {
  373. if strings.HasPrefix(c.KeyPrefix, "/") {
  374. return errors.New("key_prefix cannot start with /")
  375. }
  376. c.KeyPrefix = path.Clean(c.KeyPrefix)
  377. if !strings.HasSuffix(c.KeyPrefix, "/") {
  378. c.KeyPrefix += "/"
  379. }
  380. }
  381. if c.Credentials.IsEncrypted() && !c.Credentials.IsValid() {
  382. return errors.New("invalid encrypted credentials")
  383. }
  384. if c.AutomaticCredentials == 0 && !c.Credentials.IsValidInput() {
  385. return errors.New("invalid credentials")
  386. }
  387. c.StorageClass = strings.TrimSpace(c.StorageClass)
  388. c.ACL = strings.TrimSpace(c.ACL)
  389. return nil
  390. }
  391. // AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
  392. type AzBlobFsConfig struct {
  393. sdk.BaseAzBlobFsConfig
  394. // Storage Account Key leave blank to use SAS URL.
  395. // The access key is stored encrypted based on the kms configuration
  396. AccountKey *kms.Secret `json:"account_key,omitempty"`
  397. // Shared access signature URL, leave blank if using account/key
  398. SASURL *kms.Secret `json:"sas_url,omitempty"`
  399. }
  400. // HideConfidentialData hides confidential data
  401. func (c *AzBlobFsConfig) HideConfidentialData() {
  402. if c.AccountKey != nil {
  403. c.AccountKey.Hide()
  404. }
  405. if c.SASURL != nil {
  406. c.SASURL.Hide()
  407. }
  408. }
  409. func (c *AzBlobFsConfig) isEqual(other AzBlobFsConfig) bool {
  410. if c.Container != other.Container {
  411. return false
  412. }
  413. if c.AccountName != other.AccountName {
  414. return false
  415. }
  416. if c.Endpoint != other.Endpoint {
  417. return false
  418. }
  419. if c.SASURL.IsEmpty() {
  420. c.SASURL = kms.NewEmptySecret()
  421. }
  422. if other.SASURL.IsEmpty() {
  423. other.SASURL = kms.NewEmptySecret()
  424. }
  425. if !c.SASURL.IsEqual(other.SASURL) {
  426. return false
  427. }
  428. if c.KeyPrefix != other.KeyPrefix {
  429. return false
  430. }
  431. if c.UploadPartSize != other.UploadPartSize {
  432. return false
  433. }
  434. if c.UploadConcurrency != other.UploadConcurrency {
  435. return false
  436. }
  437. if c.DownloadPartSize != other.DownloadPartSize {
  438. return false
  439. }
  440. if c.DownloadConcurrency != other.DownloadConcurrency {
  441. return false
  442. }
  443. if c.UseEmulator != other.UseEmulator {
  444. return false
  445. }
  446. if c.AccessTier != other.AccessTier {
  447. return false
  448. }
  449. return c.isSecretEqual(other)
  450. }
  451. func (c *AzBlobFsConfig) isSecretEqual(other AzBlobFsConfig) bool {
  452. if c.AccountKey == nil {
  453. c.AccountKey = kms.NewEmptySecret()
  454. }
  455. if other.AccountKey == nil {
  456. other.AccountKey = kms.NewEmptySecret()
  457. }
  458. return c.AccountKey.IsEqual(other.AccountKey)
  459. }
  460. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  461. func (c *AzBlobFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  462. if err := c.validate(); err != nil {
  463. return util.NewValidationError(fmt.Sprintf("could not validate Azure Blob config: %v", err))
  464. }
  465. if c.AccountKey.IsPlain() {
  466. c.AccountKey.SetAdditionalData(additionalData)
  467. if err := c.AccountKey.Encrypt(); err != nil {
  468. return util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob account key: %v", err))
  469. }
  470. }
  471. if c.SASURL.IsPlain() {
  472. c.SASURL.SetAdditionalData(additionalData)
  473. if err := c.SASURL.Encrypt(); err != nil {
  474. return util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob SAS URL: %v", err))
  475. }
  476. }
  477. return nil
  478. }
  479. func (c *AzBlobFsConfig) checkCredentials() error {
  480. if c.SASURL.IsPlain() {
  481. _, err := url.Parse(c.SASURL.GetPayload())
  482. return err
  483. }
  484. if c.SASURL.IsEncrypted() && !c.SASURL.IsValid() {
  485. return errors.New("invalid encrypted sas_url")
  486. }
  487. if !c.SASURL.IsEmpty() {
  488. return nil
  489. }
  490. if c.AccountName == "" || !c.AccountKey.IsValidInput() {
  491. return errors.New("credentials cannot be empty or invalid")
  492. }
  493. if c.AccountKey.IsEncrypted() && !c.AccountKey.IsValid() {
  494. return errors.New("invalid encrypted account_key")
  495. }
  496. return nil
  497. }
  498. func (c *AzBlobFsConfig) checkPartSizeAndConcurrency() error {
  499. if c.UploadPartSize < 0 || c.UploadPartSize > 100 {
  500. return fmt.Errorf("invalid upload part size: %v", c.UploadPartSize)
  501. }
  502. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  503. return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency)
  504. }
  505. if c.DownloadPartSize < 0 || c.DownloadPartSize > 100 {
  506. return fmt.Errorf("invalid download part size: %v", c.DownloadPartSize)
  507. }
  508. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  509. return fmt.Errorf("invalid upload concurrency: %v", c.DownloadConcurrency)
  510. }
  511. return nil
  512. }
  513. func (c *AzBlobFsConfig) tryDecrypt() error {
  514. if err := c.AccountKey.TryDecrypt(); err != nil {
  515. return fmt.Errorf("unable to decrypt account key: %w", err)
  516. }
  517. if err := c.SASURL.TryDecrypt(); err != nil {
  518. return fmt.Errorf("unable to decrypt SAS URL: %w", err)
  519. }
  520. return nil
  521. }
  522. func (c *AzBlobFsConfig) isSameResource(other AzBlobFsConfig) bool {
  523. if c.AccountName != other.AccountName {
  524. return false
  525. }
  526. if c.Endpoint != other.Endpoint {
  527. return false
  528. }
  529. return c.SASURL.GetPayload() == other.SASURL.GetPayload()
  530. }
  531. // validate returns an error if the configuration is not valid
  532. func (c *AzBlobFsConfig) validate() error {
  533. if c.AccountKey == nil {
  534. c.AccountKey = kms.NewEmptySecret()
  535. }
  536. if c.SASURL == nil {
  537. c.SASURL = kms.NewEmptySecret()
  538. }
  539. // container could be embedded within SAS URL we check this at runtime
  540. if c.SASURL.IsEmpty() && c.Container == "" {
  541. return errors.New("container cannot be empty")
  542. }
  543. if err := c.checkCredentials(); err != nil {
  544. return err
  545. }
  546. if c.KeyPrefix != "" {
  547. if strings.HasPrefix(c.KeyPrefix, "/") {
  548. return errors.New("key_prefix cannot start with /")
  549. }
  550. c.KeyPrefix = path.Clean(c.KeyPrefix)
  551. if !strings.HasSuffix(c.KeyPrefix, "/") {
  552. c.KeyPrefix += "/"
  553. }
  554. }
  555. if err := c.checkPartSizeAndConcurrency(); err != nil {
  556. return err
  557. }
  558. if !util.Contains(validAzAccessTier, c.AccessTier) {
  559. return fmt.Errorf("invalid access tier %#v, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))
  560. }
  561. return nil
  562. }
  563. // CryptFsConfig defines the configuration to store local files as encrypted
  564. type CryptFsConfig struct {
  565. Passphrase *kms.Secret `json:"passphrase,omitempty"`
  566. }
  567. // HideConfidentialData hides confidential data
  568. func (c *CryptFsConfig) HideConfidentialData() {
  569. if c.Passphrase != nil {
  570. c.Passphrase.Hide()
  571. }
  572. }
  573. func (c *CryptFsConfig) isEqual(other CryptFsConfig) bool {
  574. if c.Passphrase == nil {
  575. c.Passphrase = kms.NewEmptySecret()
  576. }
  577. if other.Passphrase == nil {
  578. other.Passphrase = kms.NewEmptySecret()
  579. }
  580. return c.Passphrase.IsEqual(other.Passphrase)
  581. }
  582. // ValidateAndEncryptCredentials validates the configuration and encrypts the passphrase if it is in plain text
  583. func (c *CryptFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  584. if err := c.validate(); err != nil {
  585. return util.NewValidationError(fmt.Sprintf("could not validate Crypt fs config: %v", err))
  586. }
  587. if c.Passphrase.IsPlain() {
  588. c.Passphrase.SetAdditionalData(additionalData)
  589. if err := c.Passphrase.Encrypt(); err != nil {
  590. return util.NewValidationError(fmt.Sprintf("could not encrypt Crypt fs passphrase: %v", err))
  591. }
  592. }
  593. return nil
  594. }
  595. func (c *CryptFsConfig) isSameResource(other CryptFsConfig) bool {
  596. return c.Passphrase.GetPayload() == other.Passphrase.GetPayload()
  597. }
  598. // validate returns an error if the configuration is not valid
  599. func (c *CryptFsConfig) validate() error {
  600. if c.Passphrase == nil || c.Passphrase.IsEmpty() {
  601. return errors.New("invalid passphrase")
  602. }
  603. if !c.Passphrase.IsValidInput() {
  604. return errors.New("passphrase cannot be empty or invalid")
  605. }
  606. if c.Passphrase.IsEncrypted() && !c.Passphrase.IsValid() {
  607. return errors.New("invalid encrypted passphrase")
  608. }
  609. return nil
  610. }
  611. // PipeWriter defines a wrapper for pipeat.PipeWriterAt.
  612. type PipeWriter struct {
  613. writer *pipeat.PipeWriterAt
  614. err error
  615. done chan bool
  616. }
  617. // NewPipeWriter initializes a new PipeWriter
  618. func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter {
  619. return &PipeWriter{
  620. writer: w,
  621. err: nil,
  622. done: make(chan bool),
  623. }
  624. }
  625. // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
  626. func (p *PipeWriter) Close() error {
  627. p.writer.Close() //nolint:errcheck // the returned error is always null
  628. <-p.done
  629. return p.err
  630. }
  631. // Done unlocks other goroutines waiting on Close().
  632. // It must be called when the upload ends
  633. func (p *PipeWriter) Done(err error) {
  634. p.err = err
  635. p.done <- true
  636. }
  637. // WriteAt is a wrapper for pipeat WriteAt
  638. func (p *PipeWriter) WriteAt(data []byte, off int64) (int, error) {
  639. return p.writer.WriteAt(data, off)
  640. }
  641. // Write is a wrapper for pipeat Write
  642. func (p *PipeWriter) Write(data []byte) (int, error) {
  643. return p.writer.Write(data)
  644. }
  645. func isEqualityCheckModeValid(mode int) bool {
  646. return mode >= 0 || mode <= 1
  647. }
  648. // isDirectory checks if a path exists and is a directory
  649. func isDirectory(fs Fs, path string) (bool, error) {
  650. fileInfo, err := fs.Stat(path)
  651. if err != nil {
  652. return false, err
  653. }
  654. return fileInfo.IsDir(), err
  655. }
  656. // IsLocalOsFs returns true if fs is a local filesystem implementation
  657. func IsLocalOsFs(fs Fs) bool {
  658. return fs.Name() == osFsName
  659. }
  660. // IsCryptOsFs returns true if fs is an encrypted local filesystem implementation
  661. func IsCryptOsFs(fs Fs) bool {
  662. return fs.Name() == cryptFsName
  663. }
  664. // IsSFTPFs returns true if fs is an SFTP filesystem
  665. func IsSFTPFs(fs Fs) bool {
  666. return strings.HasPrefix(fs.Name(), sftpFsName)
  667. }
  668. // IsHTTPFs returns true if fs is an HTTP filesystem
  669. func IsHTTPFs(fs Fs) bool {
  670. return strings.HasPrefix(fs.Name(), httpFsName)
  671. }
  672. // IsBufferedSFTPFs returns true if this is a buffered SFTP filesystem
  673. func IsBufferedSFTPFs(fs Fs) bool {
  674. if !IsSFTPFs(fs) {
  675. return false
  676. }
  677. return !fs.IsUploadResumeSupported()
  678. }
  679. // IsLocalOrUnbufferedSFTPFs returns true if fs is local or SFTP with no buffer
  680. func IsLocalOrUnbufferedSFTPFs(fs Fs) bool {
  681. if IsLocalOsFs(fs) {
  682. return true
  683. }
  684. if IsSFTPFs(fs) {
  685. return fs.IsUploadResumeSupported()
  686. }
  687. return false
  688. }
  689. // IsLocalOrSFTPFs returns true if fs is local or SFTP
  690. func IsLocalOrSFTPFs(fs Fs) bool {
  691. return IsLocalOsFs(fs) || IsSFTPFs(fs)
  692. }
  693. // HasTruncateSupport returns true if the fs supports truncate files
  694. func HasTruncateSupport(fs Fs) bool {
  695. return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
  696. }
  697. // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
  698. func HasImplicitAtomicUploads(fs Fs) bool {
  699. if strings.HasPrefix(fs.Name(), s3fsName) {
  700. return true
  701. }
  702. if strings.HasPrefix(fs.Name(), gcsfsName) {
  703. return true
  704. }
  705. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  706. return true
  707. }
  708. return false
  709. }
  710. // HasOpenRWSupport returns true if the fs can open a file
  711. // for reading and writing at the same time
  712. func HasOpenRWSupport(fs Fs) bool {
  713. if IsLocalOsFs(fs) {
  714. return true
  715. }
  716. if IsSFTPFs(fs) && fs.IsUploadResumeSupported() {
  717. return true
  718. }
  719. return false
  720. }
  721. // IsLocalOrCryptoFs returns true if fs is local or local encrypted
  722. func IsLocalOrCryptoFs(fs Fs) bool {
  723. return IsLocalOsFs(fs) || IsCryptOsFs(fs)
  724. }
  725. // SetPathPermissions calls fs.Chown.
  726. // It does nothing for local filesystem on windows
  727. func SetPathPermissions(fs Fs, path string, uid int, gid int) {
  728. if uid == -1 && gid == -1 {
  729. return
  730. }
  731. if IsLocalOsFs(fs) {
  732. if runtime.GOOS == "windows" {
  733. return
  734. }
  735. }
  736. if err := fs.Chown(path, uid, gid); err != nil {
  737. fsLog(fs, logger.LevelWarn, "error chowning path %v: %v", path, err)
  738. }
  739. }
  740. func updateFileInfoModTime(storageID, objectPath string, info *FileInfo) (*FileInfo, error) {
  741. if !plugin.Handler.HasMetadater() {
  742. return info, nil
  743. }
  744. if info.IsDir() {
  745. return info, nil
  746. }
  747. mTime, err := plugin.Handler.GetModificationTime(storageID, ensureAbsPath(objectPath), info.IsDir())
  748. if errors.Is(err, metadata.ErrNoSuchObject) {
  749. return info, nil
  750. }
  751. if err != nil {
  752. return info, err
  753. }
  754. info.modTime = util.GetTimeFromMsecSinceEpoch(mTime)
  755. return info, nil
  756. }
  757. func getFolderModTimes(storageID, dirName string) (map[string]int64, error) {
  758. var err error
  759. modTimes := make(map[string]int64)
  760. if plugin.Handler.HasMetadater() {
  761. modTimes, err = plugin.Handler.GetModificationTimes(storageID, ensureAbsPath(dirName))
  762. if err != nil && !errors.Is(err, metadata.ErrNoSuchObject) {
  763. return modTimes, err
  764. }
  765. }
  766. return modTimes, nil
  767. }
  768. func ensureAbsPath(name string) string {
  769. if path.IsAbs(name) {
  770. return name
  771. }
  772. return path.Join("/", name)
  773. }
  774. func fsMetadataCheck(fs fsMetadataChecker, storageID, keyPrefix string) error {
  775. if !plugin.Handler.HasMetadater() {
  776. return nil
  777. }
  778. limit := 100
  779. from := ""
  780. for {
  781. metadataFolders, err := plugin.Handler.GetMetadataFolders(storageID, from, limit)
  782. if err != nil {
  783. fsLog(fs, logger.LevelError, "unable to get folders: %v", err)
  784. return err
  785. }
  786. for _, folder := range metadataFolders {
  787. from = folder
  788. fsPrefix := folder
  789. if !strings.HasSuffix(folder, "/") {
  790. fsPrefix += "/"
  791. }
  792. if keyPrefix != "" {
  793. if !strings.HasPrefix(fsPrefix, "/"+keyPrefix) {
  794. fsLog(fs, logger.LevelDebug, "skip metadata check for folder %#v outside prefix %#v",
  795. folder, keyPrefix)
  796. continue
  797. }
  798. }
  799. fsLog(fs, logger.LevelDebug, "check metadata for folder %#v", folder)
  800. metadataValues, err := plugin.Handler.GetModificationTimes(storageID, folder)
  801. if err != nil {
  802. fsLog(fs, logger.LevelError, "unable to get modification times for folder %#v: %v", folder, err)
  803. return err
  804. }
  805. if len(metadataValues) == 0 {
  806. fsLog(fs, logger.LevelDebug, "no metadata for folder %#v", folder)
  807. continue
  808. }
  809. fileNames, err := fs.getFileNamesInPrefix(fsPrefix)
  810. if err != nil {
  811. fsLog(fs, logger.LevelError, "unable to get content for prefix %#v: %v", fsPrefix, err)
  812. return err
  813. }
  814. // now check if we have metadata for a missing object
  815. for k := range metadataValues {
  816. if _, ok := fileNames[k]; !ok {
  817. filePath := ensureAbsPath(path.Join(folder, k))
  818. if err = plugin.Handler.RemoveMetadata(storageID, filePath); err != nil {
  819. fsLog(fs, logger.LevelError, "unable to remove metadata for missing file %#v: %v", filePath, err)
  820. } else {
  821. fsLog(fs, logger.LevelDebug, "metadata removed for missing file %#v", filePath)
  822. }
  823. }
  824. }
  825. }
  826. if len(metadataFolders) < limit {
  827. return nil
  828. }
  829. }
  830. }
  831. func getMountPath(mountPath string) string {
  832. if mountPath == "/" {
  833. return ""
  834. }
  835. return mountPath
  836. }
  837. func fsLog(fs Fs, level logger.LogLevel, format string, v ...any) {
  838. logger.Log(level, fs.Name(), fs.ConnectionID(), format, v...)
  839. }