vfs.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package vfs provides local and remote filesystems support
  15. package vfs
  16. import (
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/url"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "runtime"
  25. "strings"
  26. "time"
  27. "github.com/eikenb/pipeat"
  28. "github.com/pkg/sftp"
  29. "github.com/sftpgo/sdk"
  30. "github.com/sftpgo/sdk/plugin/metadata"
  31. "github.com/drakkan/sftpgo/v2/internal/kms"
  32. "github.com/drakkan/sftpgo/v2/internal/logger"
  33. "github.com/drakkan/sftpgo/v2/internal/plugin"
  34. "github.com/drakkan/sftpgo/v2/internal/util"
  35. )
  36. const dirMimeType = "inode/directory"
  37. var (
  38. validAzAccessTier = []string{"", "Archive", "Hot", "Cool"}
  39. // ErrStorageSizeUnavailable is returned if the storage backend does not support getting the size
  40. ErrStorageSizeUnavailable = errors.New("unable to get available size for this storage backend")
  41. // ErrVfsUnsupported defines the error for an unsupported VFS operation
  42. ErrVfsUnsupported = errors.New("not supported")
  43. tempPath string
  44. sftpFingerprints []string
  45. allowSelfConnections int
  46. )
  47. // SetAllowSelfConnections sets the desired behaviour for self connections
  48. func SetAllowSelfConnections(value int) {
  49. allowSelfConnections = value
  50. }
  51. // SetTempPath sets the path for temporary files
  52. func SetTempPath(fsPath string) {
  53. tempPath = fsPath
  54. }
  55. // GetTempPath returns the path for temporary files
  56. func GetTempPath() string {
  57. return tempPath
  58. }
  59. // SetSFTPFingerprints sets the SFTP host key fingerprints
  60. func SetSFTPFingerprints(fp []string) {
  61. sftpFingerprints = fp
  62. }
  63. // Fs defines the interface for filesystem backends
  64. type Fs interface {
  65. Name() string
  66. ConnectionID() string
  67. Stat(name string) (os.FileInfo, error)
  68. Lstat(name string) (os.FileInfo, error)
  69. Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error)
  70. Create(name string, flag int) (File, *PipeWriter, func(), error)
  71. Rename(source, target string) error
  72. Remove(name string, isDir bool) error
  73. Mkdir(name string) error
  74. Symlink(source, target string) error
  75. Chown(name string, uid int, gid int) error
  76. Chmod(name string, mode os.FileMode) error
  77. Chtimes(name string, atime, mtime time.Time, isUploading bool) error
  78. Truncate(name string, size int64) error
  79. ReadDir(dirname string) ([]os.FileInfo, error)
  80. Readlink(name string) (string, error)
  81. IsUploadResumeSupported() bool
  82. IsAtomicUploadSupported() bool
  83. CheckRootPath(username string, uid int, gid int) bool
  84. ResolvePath(virtualPath string) (string, error)
  85. IsNotExist(err error) bool
  86. IsPermission(err error) bool
  87. IsNotSupported(err error) bool
  88. ScanRootDirContents() (int, int64, error)
  89. GetDirSize(dirname string) (int, int64, error)
  90. GetAtomicUploadPath(name string) string
  91. GetRelativePath(name string) string
  92. Walk(root string, walkFn filepath.WalkFunc) error
  93. Join(elem ...string) string
  94. HasVirtualFolders() bool
  95. GetMimeType(name string) (string, error)
  96. GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error)
  97. CheckMetadata() error
  98. Close() error
  99. }
  100. // FsRealPather is a Fs that implements the RealPath method.
  101. type FsRealPather interface {
  102. Fs
  103. RealPath(p string) (string, error)
  104. }
  105. // fsMetadataChecker is a Fs that implements the getFileNamesInPrefix method.
  106. // This interface is used to abstract metadata consistency checks
  107. type fsMetadataChecker interface {
  108. Fs
  109. getFileNamesInPrefix(fsPrefix string) (map[string]bool, error)
  110. }
  111. // File defines an interface representing a SFTPGo file
  112. type File interface {
  113. io.Reader
  114. io.Writer
  115. io.Closer
  116. io.ReaderAt
  117. io.WriterAt
  118. io.Seeker
  119. Stat() (os.FileInfo, error)
  120. Name() string
  121. Truncate(size int64) error
  122. }
  123. // QuotaCheckResult defines the result for a quota check
  124. type QuotaCheckResult struct {
  125. HasSpace bool
  126. AllowedSize int64
  127. AllowedFiles int
  128. UsedSize int64
  129. UsedFiles int
  130. QuotaSize int64
  131. QuotaFiles int
  132. }
  133. // GetRemainingSize returns the remaining allowed size
  134. func (q *QuotaCheckResult) GetRemainingSize() int64 {
  135. if q.QuotaSize > 0 {
  136. return q.QuotaSize - q.UsedSize
  137. }
  138. return 0
  139. }
  140. // GetRemainingFiles returns the remaining allowed files
  141. func (q *QuotaCheckResult) GetRemainingFiles() int {
  142. if q.QuotaFiles > 0 {
  143. return q.QuotaFiles - q.UsedFiles
  144. }
  145. return 0
  146. }
  147. // S3FsConfig defines the configuration for S3 based filesystem
  148. type S3FsConfig struct {
  149. sdk.BaseS3FsConfig
  150. AccessSecret *kms.Secret `json:"access_secret,omitempty"`
  151. }
  152. // HideConfidentialData hides confidential data
  153. func (c *S3FsConfig) HideConfidentialData() {
  154. if c.AccessSecret != nil {
  155. c.AccessSecret.Hide()
  156. }
  157. }
  158. func (c *S3FsConfig) isEqual(other S3FsConfig) bool {
  159. if c.Bucket != other.Bucket {
  160. return false
  161. }
  162. if c.KeyPrefix != other.KeyPrefix {
  163. return false
  164. }
  165. if c.Region != other.Region {
  166. return false
  167. }
  168. if c.AccessKey != other.AccessKey {
  169. return false
  170. }
  171. if c.RoleARN != other.RoleARN {
  172. return false
  173. }
  174. if c.Endpoint != other.Endpoint {
  175. return false
  176. }
  177. if c.StorageClass != other.StorageClass {
  178. return false
  179. }
  180. if c.ACL != other.ACL {
  181. return false
  182. }
  183. if !c.areMultipartFieldsEqual(other) {
  184. return false
  185. }
  186. if c.ForcePathStyle != other.ForcePathStyle {
  187. return false
  188. }
  189. return c.isSecretEqual(other)
  190. }
  191. func (c *S3FsConfig) areMultipartFieldsEqual(other S3FsConfig) bool {
  192. if c.UploadPartSize != other.UploadPartSize {
  193. return false
  194. }
  195. if c.UploadConcurrency != other.UploadConcurrency {
  196. return false
  197. }
  198. if c.DownloadConcurrency != other.DownloadConcurrency {
  199. return false
  200. }
  201. if c.DownloadPartSize != other.DownloadPartSize {
  202. return false
  203. }
  204. if c.DownloadPartMaxTime != other.DownloadPartMaxTime {
  205. return false
  206. }
  207. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  208. return false
  209. }
  210. return true
  211. }
  212. func (c *S3FsConfig) isSecretEqual(other S3FsConfig) bool {
  213. if c.AccessSecret == nil {
  214. c.AccessSecret = kms.NewEmptySecret()
  215. }
  216. if other.AccessSecret == nil {
  217. other.AccessSecret = kms.NewEmptySecret()
  218. }
  219. return c.AccessSecret.IsEqual(other.AccessSecret)
  220. }
  221. func (c *S3FsConfig) checkCredentials() error {
  222. if c.AccessKey == "" && !c.AccessSecret.IsEmpty() {
  223. return errors.New("access_key cannot be empty with access_secret not empty")
  224. }
  225. if c.AccessSecret.IsEmpty() && c.AccessKey != "" {
  226. return errors.New("access_secret cannot be empty with access_key not empty")
  227. }
  228. if c.AccessSecret.IsEncrypted() && !c.AccessSecret.IsValid() {
  229. return errors.New("invalid encrypted access_secret")
  230. }
  231. if !c.AccessSecret.IsEmpty() && !c.AccessSecret.IsValidInput() {
  232. return errors.New("invalid access_secret")
  233. }
  234. return nil
  235. }
  236. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  237. func (c *S3FsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  238. if err := c.validate(); err != nil {
  239. return util.NewValidationError(fmt.Sprintf("could not validate s3config: %v", err))
  240. }
  241. if c.AccessSecret.IsPlain() {
  242. c.AccessSecret.SetAdditionalData(additionalData)
  243. err := c.AccessSecret.Encrypt()
  244. if err != nil {
  245. return util.NewValidationError(fmt.Sprintf("could not encrypt s3 access secret: %v", err))
  246. }
  247. }
  248. return nil
  249. }
  250. func (c *S3FsConfig) checkPartSizeAndConcurrency() error {
  251. if c.UploadPartSize != 0 && (c.UploadPartSize < 5 || c.UploadPartSize > 5000) {
  252. return errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
  253. }
  254. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  255. return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency)
  256. }
  257. if c.DownloadPartSize != 0 && (c.DownloadPartSize < 5 || c.DownloadPartSize > 5000) {
  258. return errors.New("download_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
  259. }
  260. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  261. return fmt.Errorf("invalid download concurrency: %v", c.DownloadConcurrency)
  262. }
  263. return nil
  264. }
  265. func (c *S3FsConfig) isSameResource(other S3FsConfig) bool {
  266. if c.Bucket != other.Bucket {
  267. return false
  268. }
  269. if c.Endpoint != other.Endpoint {
  270. return false
  271. }
  272. return c.Region == other.Region
  273. }
  274. // validate returns an error if the configuration is not valid
  275. func (c *S3FsConfig) validate() error {
  276. if c.AccessSecret == nil {
  277. c.AccessSecret = kms.NewEmptySecret()
  278. }
  279. if c.Bucket == "" {
  280. return errors.New("bucket cannot be empty")
  281. }
  282. // the region may be embedded within the endpoint for some S3 compatible
  283. // object storage, for example B2
  284. if c.Endpoint == "" && c.Region == "" {
  285. return errors.New("region cannot be empty")
  286. }
  287. if err := c.checkCredentials(); err != nil {
  288. return err
  289. }
  290. if c.KeyPrefix != "" {
  291. if strings.HasPrefix(c.KeyPrefix, "/") {
  292. return errors.New("key_prefix cannot start with /")
  293. }
  294. c.KeyPrefix = path.Clean(c.KeyPrefix)
  295. if !strings.HasSuffix(c.KeyPrefix, "/") {
  296. c.KeyPrefix += "/"
  297. }
  298. }
  299. c.StorageClass = strings.TrimSpace(c.StorageClass)
  300. c.ACL = strings.TrimSpace(c.ACL)
  301. return c.checkPartSizeAndConcurrency()
  302. }
  303. // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
  304. type GCSFsConfig struct {
  305. sdk.BaseGCSFsConfig
  306. Credentials *kms.Secret `json:"credentials,omitempty"`
  307. }
  308. // HideConfidentialData hides confidential data
  309. func (c *GCSFsConfig) HideConfidentialData() {
  310. if c.Credentials != nil {
  311. c.Credentials.Hide()
  312. }
  313. }
  314. // ValidateAndEncryptCredentials validates the configuration and encrypts credentials if they are in plain text
  315. func (c *GCSFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  316. if err := c.validate(); err != nil {
  317. return util.NewValidationError(fmt.Sprintf("could not validate GCS config: %v", err))
  318. }
  319. if c.Credentials.IsPlain() {
  320. c.Credentials.SetAdditionalData(additionalData)
  321. err := c.Credentials.Encrypt()
  322. if err != nil {
  323. return util.NewValidationError(fmt.Sprintf("could not encrypt GCS credentials: %v", err))
  324. }
  325. }
  326. return nil
  327. }
  328. func (c *GCSFsConfig) isEqual(other GCSFsConfig) bool {
  329. if c.Bucket != other.Bucket {
  330. return false
  331. }
  332. if c.KeyPrefix != other.KeyPrefix {
  333. return false
  334. }
  335. if c.AutomaticCredentials != other.AutomaticCredentials {
  336. return false
  337. }
  338. if c.StorageClass != other.StorageClass {
  339. return false
  340. }
  341. if c.ACL != other.ACL {
  342. return false
  343. }
  344. if c.Credentials == nil {
  345. c.Credentials = kms.NewEmptySecret()
  346. }
  347. if other.Credentials == nil {
  348. other.Credentials = kms.NewEmptySecret()
  349. }
  350. return c.Credentials.IsEqual(other.Credentials)
  351. }
  352. func (c *GCSFsConfig) isSameResource(other GCSFsConfig) bool {
  353. return c.Bucket == other.Bucket
  354. }
  355. // validate returns an error if the configuration is not valid
  356. func (c *GCSFsConfig) validate() error {
  357. if c.Credentials == nil || c.AutomaticCredentials == 1 {
  358. c.Credentials = kms.NewEmptySecret()
  359. }
  360. if c.Bucket == "" {
  361. return errors.New("bucket cannot be empty")
  362. }
  363. if c.KeyPrefix != "" {
  364. if strings.HasPrefix(c.KeyPrefix, "/") {
  365. return errors.New("key_prefix cannot start with /")
  366. }
  367. c.KeyPrefix = path.Clean(c.KeyPrefix)
  368. if !strings.HasSuffix(c.KeyPrefix, "/") {
  369. c.KeyPrefix += "/"
  370. }
  371. }
  372. if c.Credentials.IsEncrypted() && !c.Credentials.IsValid() {
  373. return errors.New("invalid encrypted credentials")
  374. }
  375. if c.AutomaticCredentials == 0 && !c.Credentials.IsValidInput() {
  376. return errors.New("invalid credentials")
  377. }
  378. c.StorageClass = strings.TrimSpace(c.StorageClass)
  379. c.ACL = strings.TrimSpace(c.ACL)
  380. return nil
  381. }
  382. // AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
  383. type AzBlobFsConfig struct {
  384. sdk.BaseAzBlobFsConfig
  385. // Storage Account Key leave blank to use SAS URL.
  386. // The access key is stored encrypted based on the kms configuration
  387. AccountKey *kms.Secret `json:"account_key,omitempty"`
  388. // Shared access signature URL, leave blank if using account/key
  389. SASURL *kms.Secret `json:"sas_url,omitempty"`
  390. }
  391. // HideConfidentialData hides confidential data
  392. func (c *AzBlobFsConfig) HideConfidentialData() {
  393. if c.AccountKey != nil {
  394. c.AccountKey.Hide()
  395. }
  396. if c.SASURL != nil {
  397. c.SASURL.Hide()
  398. }
  399. }
  400. func (c *AzBlobFsConfig) isEqual(other AzBlobFsConfig) bool {
  401. if c.Container != other.Container {
  402. return false
  403. }
  404. if c.AccountName != other.AccountName {
  405. return false
  406. }
  407. if c.Endpoint != other.Endpoint {
  408. return false
  409. }
  410. if c.SASURL.IsEmpty() {
  411. c.SASURL = kms.NewEmptySecret()
  412. }
  413. if other.SASURL.IsEmpty() {
  414. other.SASURL = kms.NewEmptySecret()
  415. }
  416. if !c.SASURL.IsEqual(other.SASURL) {
  417. return false
  418. }
  419. if c.KeyPrefix != other.KeyPrefix {
  420. return false
  421. }
  422. if c.UploadPartSize != other.UploadPartSize {
  423. return false
  424. }
  425. if c.UploadConcurrency != other.UploadConcurrency {
  426. return false
  427. }
  428. if c.DownloadPartSize != other.DownloadPartSize {
  429. return false
  430. }
  431. if c.DownloadConcurrency != other.DownloadConcurrency {
  432. return false
  433. }
  434. if c.UseEmulator != other.UseEmulator {
  435. return false
  436. }
  437. if c.AccessTier != other.AccessTier {
  438. return false
  439. }
  440. return c.isSecretEqual(other)
  441. }
  442. func (c *AzBlobFsConfig) isSecretEqual(other AzBlobFsConfig) bool {
  443. if c.AccountKey == nil {
  444. c.AccountKey = kms.NewEmptySecret()
  445. }
  446. if other.AccountKey == nil {
  447. other.AccountKey = kms.NewEmptySecret()
  448. }
  449. return c.AccountKey.IsEqual(other.AccountKey)
  450. }
  451. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  452. func (c *AzBlobFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  453. if err := c.validate(); err != nil {
  454. return util.NewValidationError(fmt.Sprintf("could not validate Azure Blob config: %v", err))
  455. }
  456. if c.AccountKey.IsPlain() {
  457. c.AccountKey.SetAdditionalData(additionalData)
  458. if err := c.AccountKey.Encrypt(); err != nil {
  459. return util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob account key: %v", err))
  460. }
  461. }
  462. if c.SASURL.IsPlain() {
  463. c.SASURL.SetAdditionalData(additionalData)
  464. if err := c.SASURL.Encrypt(); err != nil {
  465. return util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob SAS URL: %v", err))
  466. }
  467. }
  468. return nil
  469. }
  470. func (c *AzBlobFsConfig) checkCredentials() error {
  471. if c.SASURL.IsPlain() {
  472. _, err := url.Parse(c.SASURL.GetPayload())
  473. return err
  474. }
  475. if c.SASURL.IsEncrypted() && !c.SASURL.IsValid() {
  476. return errors.New("invalid encrypted sas_url")
  477. }
  478. if !c.SASURL.IsEmpty() {
  479. return nil
  480. }
  481. if c.AccountName == "" || !c.AccountKey.IsValidInput() {
  482. return errors.New("credentials cannot be empty or invalid")
  483. }
  484. if c.AccountKey.IsEncrypted() && !c.AccountKey.IsValid() {
  485. return errors.New("invalid encrypted account_key")
  486. }
  487. return nil
  488. }
  489. func (c *AzBlobFsConfig) checkPartSizeAndConcurrency() error {
  490. if c.UploadPartSize < 0 || c.UploadPartSize > 100 {
  491. return fmt.Errorf("invalid upload part size: %v", c.UploadPartSize)
  492. }
  493. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  494. return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency)
  495. }
  496. if c.DownloadPartSize < 0 || c.DownloadPartSize > 100 {
  497. return fmt.Errorf("invalid download part size: %v", c.DownloadPartSize)
  498. }
  499. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  500. return fmt.Errorf("invalid upload concurrency: %v", c.DownloadConcurrency)
  501. }
  502. return nil
  503. }
  504. func (c *AzBlobFsConfig) tryDecrypt() error {
  505. if err := c.AccountKey.TryDecrypt(); err != nil {
  506. return fmt.Errorf("unable to decrypt account key: %w", err)
  507. }
  508. if err := c.SASURL.TryDecrypt(); err != nil {
  509. return fmt.Errorf("unable to decrypt SAS URL: %w", err)
  510. }
  511. return nil
  512. }
  513. func (c *AzBlobFsConfig) isSameResource(other AzBlobFsConfig) bool {
  514. if c.AccountName != other.AccountName {
  515. return false
  516. }
  517. if c.Endpoint != other.Endpoint {
  518. return false
  519. }
  520. return c.SASURL.GetPayload() == other.SASURL.GetPayload()
  521. }
  522. // validate returns an error if the configuration is not valid
  523. func (c *AzBlobFsConfig) validate() error {
  524. if c.AccountKey == nil {
  525. c.AccountKey = kms.NewEmptySecret()
  526. }
  527. if c.SASURL == nil {
  528. c.SASURL = kms.NewEmptySecret()
  529. }
  530. // container could be embedded within SAS URL we check this at runtime
  531. if c.SASURL.IsEmpty() && c.Container == "" {
  532. return errors.New("container cannot be empty")
  533. }
  534. if err := c.checkCredentials(); err != nil {
  535. return err
  536. }
  537. if c.KeyPrefix != "" {
  538. if strings.HasPrefix(c.KeyPrefix, "/") {
  539. return errors.New("key_prefix cannot start with /")
  540. }
  541. c.KeyPrefix = path.Clean(c.KeyPrefix)
  542. if !strings.HasSuffix(c.KeyPrefix, "/") {
  543. c.KeyPrefix += "/"
  544. }
  545. }
  546. if err := c.checkPartSizeAndConcurrency(); err != nil {
  547. return err
  548. }
  549. if !util.Contains(validAzAccessTier, c.AccessTier) {
  550. return fmt.Errorf("invalid access tier %#v, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))
  551. }
  552. return nil
  553. }
  554. // CryptFsConfig defines the configuration to store local files as encrypted
  555. type CryptFsConfig struct {
  556. Passphrase *kms.Secret `json:"passphrase,omitempty"`
  557. }
  558. // HideConfidentialData hides confidential data
  559. func (c *CryptFsConfig) HideConfidentialData() {
  560. if c.Passphrase != nil {
  561. c.Passphrase.Hide()
  562. }
  563. }
  564. func (c *CryptFsConfig) isEqual(other CryptFsConfig) bool {
  565. if c.Passphrase == nil {
  566. c.Passphrase = kms.NewEmptySecret()
  567. }
  568. if other.Passphrase == nil {
  569. other.Passphrase = kms.NewEmptySecret()
  570. }
  571. return c.Passphrase.IsEqual(other.Passphrase)
  572. }
  573. // ValidateAndEncryptCredentials validates the configuration and encrypts the passphrase if it is in plain text
  574. func (c *CryptFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  575. if err := c.validate(); err != nil {
  576. return util.NewValidationError(fmt.Sprintf("could not validate Crypt fs config: %v", err))
  577. }
  578. if c.Passphrase.IsPlain() {
  579. c.Passphrase.SetAdditionalData(additionalData)
  580. if err := c.Passphrase.Encrypt(); err != nil {
  581. return util.NewValidationError(fmt.Sprintf("could not encrypt Crypt fs passphrase: %v", err))
  582. }
  583. }
  584. return nil
  585. }
  586. func (c *CryptFsConfig) isSameResource(other CryptFsConfig) bool {
  587. return c.Passphrase.GetPayload() == other.Passphrase.GetPayload()
  588. }
  589. // validate returns an error if the configuration is not valid
  590. func (c *CryptFsConfig) validate() error {
  591. if c.Passphrase == nil || c.Passphrase.IsEmpty() {
  592. return errors.New("invalid passphrase")
  593. }
  594. if !c.Passphrase.IsValidInput() {
  595. return errors.New("passphrase cannot be empty or invalid")
  596. }
  597. if c.Passphrase.IsEncrypted() && !c.Passphrase.IsValid() {
  598. return errors.New("invalid encrypted passphrase")
  599. }
  600. return nil
  601. }
  602. // PipeWriter defines a wrapper for pipeat.PipeWriterAt.
  603. type PipeWriter struct {
  604. writer *pipeat.PipeWriterAt
  605. err error
  606. done chan bool
  607. }
  608. // NewPipeWriter initializes a new PipeWriter
  609. func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter {
  610. return &PipeWriter{
  611. writer: w,
  612. err: nil,
  613. done: make(chan bool),
  614. }
  615. }
  616. // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
  617. func (p *PipeWriter) Close() error {
  618. p.writer.Close() //nolint:errcheck // the returned error is always null
  619. <-p.done
  620. return p.err
  621. }
  622. // Done unlocks other goroutines waiting on Close().
  623. // It must be called when the upload ends
  624. func (p *PipeWriter) Done(err error) {
  625. p.err = err
  626. p.done <- true
  627. }
  628. // WriteAt is a wrapper for pipeat WriteAt
  629. func (p *PipeWriter) WriteAt(data []byte, off int64) (int, error) {
  630. return p.writer.WriteAt(data, off)
  631. }
  632. // Write is a wrapper for pipeat Write
  633. func (p *PipeWriter) Write(data []byte) (int, error) {
  634. return p.writer.Write(data)
  635. }
  636. func isEqualityCheckModeValid(mode int) bool {
  637. return mode >= 0 || mode <= 1
  638. }
  639. // isDirectory checks if a path exists and is a directory
  640. func isDirectory(fs Fs, path string) (bool, error) {
  641. fileInfo, err := fs.Stat(path)
  642. if err != nil {
  643. return false, err
  644. }
  645. return fileInfo.IsDir(), err
  646. }
  647. // IsLocalOsFs returns true if fs is a local filesystem implementation
  648. func IsLocalOsFs(fs Fs) bool {
  649. return fs.Name() == osFsName
  650. }
  651. // IsCryptOsFs returns true if fs is an encrypted local filesystem implementation
  652. func IsCryptOsFs(fs Fs) bool {
  653. return fs.Name() == cryptFsName
  654. }
  655. // IsSFTPFs returns true if fs is an SFTP filesystem
  656. func IsSFTPFs(fs Fs) bool {
  657. return strings.HasPrefix(fs.Name(), sftpFsName)
  658. }
  659. // IsHTTPFs returns true if fs is an HTTP filesystem
  660. func IsHTTPFs(fs Fs) bool {
  661. return strings.HasPrefix(fs.Name(), httpFsName)
  662. }
  663. // IsBufferedSFTPFs returns true if this is a buffered SFTP filesystem
  664. func IsBufferedSFTPFs(fs Fs) bool {
  665. if !IsSFTPFs(fs) {
  666. return false
  667. }
  668. return !fs.IsUploadResumeSupported()
  669. }
  670. // IsLocalOrUnbufferedSFTPFs returns true if fs is local or SFTP with no buffer
  671. func IsLocalOrUnbufferedSFTPFs(fs Fs) bool {
  672. if IsLocalOsFs(fs) {
  673. return true
  674. }
  675. if IsSFTPFs(fs) {
  676. return fs.IsUploadResumeSupported()
  677. }
  678. return false
  679. }
  680. // IsLocalOrSFTPFs returns true if fs is local or SFTP
  681. func IsLocalOrSFTPFs(fs Fs) bool {
  682. return IsLocalOsFs(fs) || IsSFTPFs(fs)
  683. }
  684. // HasTruncateSupport returns true if the fs supports truncate files
  685. func HasTruncateSupport(fs Fs) bool {
  686. return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
  687. }
  688. // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
  689. func HasImplicitAtomicUploads(fs Fs) bool {
  690. if strings.HasPrefix(fs.Name(), s3fsName) {
  691. return true
  692. }
  693. if strings.HasPrefix(fs.Name(), gcsfsName) {
  694. return true
  695. }
  696. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  697. return true
  698. }
  699. return false
  700. }
  701. // HasOpenRWSupport returns true if the fs can open a file
  702. // for reading and writing at the same time
  703. func HasOpenRWSupport(fs Fs) bool {
  704. if IsLocalOsFs(fs) {
  705. return true
  706. }
  707. if IsSFTPFs(fs) && fs.IsUploadResumeSupported() {
  708. return true
  709. }
  710. return false
  711. }
  712. // IsLocalOrCryptoFs returns true if fs is local or local encrypted
  713. func IsLocalOrCryptoFs(fs Fs) bool {
  714. return IsLocalOsFs(fs) || IsCryptOsFs(fs)
  715. }
  716. // SetPathPermissions calls fs.Chown.
  717. // It does nothing for local filesystem on windows
  718. func SetPathPermissions(fs Fs, path string, uid int, gid int) {
  719. if uid == -1 && gid == -1 {
  720. return
  721. }
  722. if IsLocalOsFs(fs) {
  723. if runtime.GOOS == "windows" {
  724. return
  725. }
  726. }
  727. if err := fs.Chown(path, uid, gid); err != nil {
  728. fsLog(fs, logger.LevelWarn, "error chowning path %v: %v", path, err)
  729. }
  730. }
  731. func updateFileInfoModTime(storageID, objectPath string, info *FileInfo) (*FileInfo, error) {
  732. if !plugin.Handler.HasMetadater() {
  733. return info, nil
  734. }
  735. if info.IsDir() {
  736. return info, nil
  737. }
  738. mTime, err := plugin.Handler.GetModificationTime(storageID, ensureAbsPath(objectPath), info.IsDir())
  739. if errors.Is(err, metadata.ErrNoSuchObject) {
  740. return info, nil
  741. }
  742. if err != nil {
  743. return info, err
  744. }
  745. info.modTime = util.GetTimeFromMsecSinceEpoch(mTime)
  746. return info, nil
  747. }
  748. func getFolderModTimes(storageID, dirName string) (map[string]int64, error) {
  749. var err error
  750. modTimes := make(map[string]int64)
  751. if plugin.Handler.HasMetadater() {
  752. modTimes, err = plugin.Handler.GetModificationTimes(storageID, ensureAbsPath(dirName))
  753. if err != nil && !errors.Is(err, metadata.ErrNoSuchObject) {
  754. return modTimes, err
  755. }
  756. }
  757. return modTimes, nil
  758. }
  759. func ensureAbsPath(name string) string {
  760. if path.IsAbs(name) {
  761. return name
  762. }
  763. return path.Join("/", name)
  764. }
  765. func fsMetadataCheck(fs fsMetadataChecker, storageID, keyPrefix string) error {
  766. if !plugin.Handler.HasMetadater() {
  767. return nil
  768. }
  769. limit := 100
  770. from := ""
  771. for {
  772. metadataFolders, err := plugin.Handler.GetMetadataFolders(storageID, from, limit)
  773. if err != nil {
  774. fsLog(fs, logger.LevelError, "unable to get folders: %v", err)
  775. return err
  776. }
  777. for _, folder := range metadataFolders {
  778. from = folder
  779. fsPrefix := folder
  780. if !strings.HasSuffix(folder, "/") {
  781. fsPrefix += "/"
  782. }
  783. if keyPrefix != "" {
  784. if !strings.HasPrefix(fsPrefix, "/"+keyPrefix) {
  785. fsLog(fs, logger.LevelDebug, "skip metadata check for folder %#v outside prefix %#v",
  786. folder, keyPrefix)
  787. continue
  788. }
  789. }
  790. fsLog(fs, logger.LevelDebug, "check metadata for folder %#v", folder)
  791. metadataValues, err := plugin.Handler.GetModificationTimes(storageID, folder)
  792. if err != nil {
  793. fsLog(fs, logger.LevelError, "unable to get modification times for folder %#v: %v", folder, err)
  794. return err
  795. }
  796. if len(metadataValues) == 0 {
  797. fsLog(fs, logger.LevelDebug, "no metadata for folder %#v", folder)
  798. continue
  799. }
  800. fileNames, err := fs.getFileNamesInPrefix(fsPrefix)
  801. if err != nil {
  802. fsLog(fs, logger.LevelError, "unable to get content for prefix %#v: %v", fsPrefix, err)
  803. return err
  804. }
  805. // now check if we have metadata for a missing object
  806. for k := range metadataValues {
  807. if _, ok := fileNames[k]; !ok {
  808. filePath := ensureAbsPath(path.Join(folder, k))
  809. if err = plugin.Handler.RemoveMetadata(storageID, filePath); err != nil {
  810. fsLog(fs, logger.LevelError, "unable to remove metadata for missing file %#v: %v", filePath, err)
  811. } else {
  812. fsLog(fs, logger.LevelDebug, "metadata removed for missing file %#v", filePath)
  813. }
  814. }
  815. }
  816. }
  817. if len(metadataFolders) < limit {
  818. return nil
  819. }
  820. }
  821. }
  822. func getMountPath(mountPath string) string {
  823. if mountPath == "/" {
  824. return ""
  825. }
  826. return mountPath
  827. }
  828. func fsLog(fs Fs, level logger.LogLevel, format string, v ...any) {
  829. logger.Log(level, fs.Name(), fs.ConnectionID(), format, v...)
  830. }