vfs.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package vfs provides local and remote filesystems support
  15. package vfs
  16. import (
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/url"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "runtime"
  25. "slices"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "github.com/eikenb/pipeat"
  31. "github.com/pkg/sftp"
  32. "github.com/sftpgo/sdk"
  33. "github.com/drakkan/sftpgo/v2/internal/kms"
  34. "github.com/drakkan/sftpgo/v2/internal/logger"
  35. "github.com/drakkan/sftpgo/v2/internal/util"
  36. )
  37. const (
  38. dirMimeType = "inode/directory"
  39. s3fsName = "S3Fs"
  40. gcsfsName = "GCSFs"
  41. azBlobFsName = "AzureBlobFs"
  42. lastModifiedField = "sftpgo_last_modified"
  43. preResumeTimeout = 90 * time.Second
  44. // ListerBatchSize defines the default limit for DirLister implementations
  45. ListerBatchSize = 1000
  46. )
  47. // Additional checks for files
  48. const (
  49. CheckParentDir = 1
  50. CheckResume = 2
  51. CheckUpdateModTime = 4
  52. )
  53. var (
  54. validAzAccessTier = []string{"", "Archive", "Hot", "Cool"}
  55. // ErrStorageSizeUnavailable is returned if the storage backend does not support getting the size
  56. ErrStorageSizeUnavailable = errors.New("unable to get available size for this storage backend")
  57. // ErrVfsUnsupported defines the error for an unsupported VFS operation
  58. ErrVfsUnsupported = errors.New("not supported")
  59. errInvalidDirListerLimit = errors.New("dir lister: invalid limit, must be > 0")
  60. tempPath string
  61. sftpFingerprints []string
  62. allowSelfConnections int
  63. renameMode int
  64. readMetadata int
  65. resumeMaxSize int64
  66. uploadMode int
  67. )
  68. var (
  69. createPipeFn = func(dirPath string, _ int64) (pipeReaderAt, pipeWriterAt, error) {
  70. return pipeat.PipeInDir(dirPath)
  71. }
  72. )
  73. // SetAllowSelfConnections sets the desired behaviour for self connections
  74. func SetAllowSelfConnections(value int) {
  75. allowSelfConnections = value
  76. }
  77. // SetTempPath sets the path for temporary files
  78. func SetTempPath(fsPath string) {
  79. tempPath = fsPath
  80. }
  81. // GetTempPath returns the path for temporary files
  82. func GetTempPath() string {
  83. return tempPath
  84. }
  85. // SetSFTPFingerprints sets the SFTP host key fingerprints
  86. func SetSFTPFingerprints(fp []string) {
  87. sftpFingerprints = fp
  88. }
  89. // SetRenameMode sets the rename mode
  90. func SetRenameMode(val int) {
  91. renameMode = val
  92. }
  93. // SetReadMetadataMode sets the read metadata mode
  94. func SetReadMetadataMode(val int) {
  95. readMetadata = val
  96. }
  97. // SetResumeMaxSize sets the max size allowed for resuming uploads for backends
  98. // with immutable objects
  99. func SetResumeMaxSize(val int64) {
  100. resumeMaxSize = val
  101. }
  102. // SetUploadMode sets the upload mode
  103. func SetUploadMode(val int) {
  104. uploadMode = val
  105. }
  106. // Fs defines the interface for filesystem backends
  107. type Fs interface {
  108. Name() string
  109. ConnectionID() string
  110. Stat(name string) (os.FileInfo, error)
  111. Lstat(name string) (os.FileInfo, error)
  112. Open(name string, offset int64) (File, PipeReader, func(), error)
  113. Create(name string, flag, checks int) (File, PipeWriter, func(), error)
  114. Rename(source, target string, checks int) (int, int64, error)
  115. Remove(name string, isDir bool) error
  116. Mkdir(name string) error
  117. Symlink(source, target string) error
  118. Chown(name string, uid int, gid int) error
  119. Chmod(name string, mode os.FileMode) error
  120. Chtimes(name string, atime, mtime time.Time, isUploading bool) error
  121. Truncate(name string, size int64) error
  122. ReadDir(dirname string) (DirLister, error)
  123. Readlink(name string) (string, error)
  124. IsUploadResumeSupported() bool
  125. IsConditionalUploadResumeSupported(size int64) bool
  126. IsAtomicUploadSupported() bool
  127. CheckRootPath(username string, uid int, gid int) bool
  128. ResolvePath(virtualPath string) (string, error)
  129. IsNotExist(err error) bool
  130. IsPermission(err error) bool
  131. IsNotSupported(err error) bool
  132. ScanRootDirContents() (int, int64, error)
  133. GetDirSize(dirname string) (int, int64, error)
  134. GetAtomicUploadPath(name string) string
  135. GetRelativePath(name string) string
  136. Walk(root string, walkFn filepath.WalkFunc) error
  137. Join(elem ...string) string
  138. HasVirtualFolders() bool
  139. GetMimeType(name string) (string, error)
  140. GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error)
  141. Close() error
  142. }
  143. // FsRealPather is a Fs that implements the RealPath method.
  144. type FsRealPather interface {
  145. Fs
  146. RealPath(p string) (string, error)
  147. }
  148. // FsFileCopier is a Fs that implements the CopyFile method.
  149. type FsFileCopier interface {
  150. Fs
  151. CopyFile(source, target string, srcInfo os.FileInfo) (int, int64, error)
  152. }
  153. // File defines an interface representing a SFTPGo file
  154. type File interface {
  155. io.Reader
  156. io.Writer
  157. io.Closer
  158. io.ReaderAt
  159. io.WriterAt
  160. io.Seeker
  161. Stat() (os.FileInfo, error)
  162. Name() string
  163. Truncate(size int64) error
  164. }
  165. // PipeWriter defines an interface representing a SFTPGo pipe writer
  166. type PipeWriter interface {
  167. io.Writer
  168. io.WriterAt
  169. io.Closer
  170. Done(err error)
  171. GetWrittenBytes() int64
  172. }
  173. // PipeReader defines an interface representing a SFTPGo pipe reader
  174. type PipeReader interface {
  175. io.Reader
  176. io.ReaderAt
  177. io.Closer
  178. setMetadata(value map[string]string)
  179. setMetadataFromPointerVal(value map[string]*string)
  180. Metadata() map[string]string
  181. }
  182. type pipeReaderAt interface {
  183. Read(p []byte) (int, error)
  184. ReadAt(p []byte, offset int64) (int, error)
  185. GetReadedBytes() int64
  186. Close() error
  187. CloseWithError(err error) error
  188. }
  189. type pipeWriterAt interface {
  190. Write(p []byte) (int, error)
  191. WriteAt(p []byte, offset int64) (int, error)
  192. GetWrittenBytes() int64
  193. Close() error
  194. CloseWithError(err error) error
  195. }
  196. // DirLister defines an interface for a directory lister
  197. type DirLister interface {
  198. Next(limit int) ([]os.FileInfo, error)
  199. Close() error
  200. }
  201. // Metadater defines an interface to implement to return metadata for a file
  202. type Metadater interface {
  203. Metadata() map[string]string
  204. }
  205. type baseDirLister struct {
  206. cache []os.FileInfo
  207. }
  208. func (l *baseDirLister) Next(limit int) ([]os.FileInfo, error) {
  209. if limit <= 0 {
  210. return nil, errInvalidDirListerLimit
  211. }
  212. if len(l.cache) >= limit {
  213. return l.returnFromCache(limit), nil
  214. }
  215. return l.returnFromCache(limit), io.EOF
  216. }
  217. func (l *baseDirLister) returnFromCache(limit int) []os.FileInfo {
  218. if len(l.cache) >= limit {
  219. result := l.cache[:limit]
  220. l.cache = l.cache[limit:]
  221. return result
  222. }
  223. result := l.cache
  224. l.cache = nil
  225. return result
  226. }
  227. func (l *baseDirLister) Close() error {
  228. l.cache = nil
  229. return nil
  230. }
  231. // QuotaCheckResult defines the result for a quota check
  232. type QuotaCheckResult struct {
  233. HasSpace bool
  234. AllowedSize int64
  235. AllowedFiles int
  236. UsedSize int64
  237. UsedFiles int
  238. QuotaSize int64
  239. QuotaFiles int
  240. }
  241. // GetRemainingSize returns the remaining allowed size
  242. func (q *QuotaCheckResult) GetRemainingSize() int64 {
  243. if q.QuotaSize > 0 {
  244. return q.QuotaSize - q.UsedSize
  245. }
  246. return 0
  247. }
  248. // GetRemainingFiles returns the remaining allowed files
  249. func (q *QuotaCheckResult) GetRemainingFiles() int {
  250. if q.QuotaFiles > 0 {
  251. return q.QuotaFiles - q.UsedFiles
  252. }
  253. return 0
  254. }
  255. // S3FsConfig defines the configuration for S3 based filesystem
  256. type S3FsConfig struct {
  257. sdk.BaseS3FsConfig
  258. AccessSecret *kms.Secret `json:"access_secret,omitempty"`
  259. SSECustomerKey *kms.Secret `json:"sse_customer_key,omitempty"`
  260. }
  261. // HideConfidentialData hides confidential data
  262. func (c *S3FsConfig) HideConfidentialData() {
  263. if c.AccessSecret != nil {
  264. c.AccessSecret.Hide()
  265. }
  266. if c.SSECustomerKey != nil {
  267. c.SSECustomerKey.Hide()
  268. }
  269. }
  270. func (c *S3FsConfig) isEqual(other S3FsConfig) bool {
  271. if c.Bucket != other.Bucket {
  272. return false
  273. }
  274. if c.KeyPrefix != other.KeyPrefix {
  275. return false
  276. }
  277. if c.Region != other.Region {
  278. return false
  279. }
  280. if c.AccessKey != other.AccessKey {
  281. return false
  282. }
  283. if c.RoleARN != other.RoleARN {
  284. return false
  285. }
  286. if c.Endpoint != other.Endpoint {
  287. return false
  288. }
  289. if c.StorageClass != other.StorageClass {
  290. return false
  291. }
  292. if c.ACL != other.ACL {
  293. return false
  294. }
  295. if !c.areMultipartFieldsEqual(other) {
  296. return false
  297. }
  298. if c.ForcePathStyle != other.ForcePathStyle {
  299. return false
  300. }
  301. if c.SkipTLSVerify != other.SkipTLSVerify {
  302. return false
  303. }
  304. return c.isSecretEqual(other)
  305. }
  306. func (c *S3FsConfig) areMultipartFieldsEqual(other S3FsConfig) bool {
  307. if c.UploadPartSize != other.UploadPartSize {
  308. return false
  309. }
  310. if c.UploadConcurrency != other.UploadConcurrency {
  311. return false
  312. }
  313. if c.DownloadConcurrency != other.DownloadConcurrency {
  314. return false
  315. }
  316. if c.DownloadPartSize != other.DownloadPartSize {
  317. return false
  318. }
  319. if c.DownloadPartMaxTime != other.DownloadPartMaxTime {
  320. return false
  321. }
  322. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  323. return false
  324. }
  325. return true
  326. }
  327. func (c *S3FsConfig) isSecretEqual(other S3FsConfig) bool {
  328. if c.SSECustomerKey == nil {
  329. c.SSECustomerKey = kms.NewEmptySecret()
  330. }
  331. if other.SSECustomerKey == nil {
  332. other.SSECustomerKey = kms.NewEmptySecret()
  333. }
  334. if !c.SSECustomerKey.IsEqual(other.SSECustomerKey) {
  335. return false
  336. }
  337. if c.AccessSecret == nil {
  338. c.AccessSecret = kms.NewEmptySecret()
  339. }
  340. if other.AccessSecret == nil {
  341. other.AccessSecret = kms.NewEmptySecret()
  342. }
  343. return c.AccessSecret.IsEqual(other.AccessSecret)
  344. }
  345. func (c *S3FsConfig) checkCredentials() error {
  346. if c.AccessKey == "" && !c.AccessSecret.IsEmpty() {
  347. return util.NewI18nError(
  348. errors.New("access_key cannot be empty with access_secret not empty"),
  349. util.I18nErrorAccessKeyRequired,
  350. )
  351. }
  352. if c.AccessSecret.IsEmpty() && c.AccessKey != "" {
  353. return util.NewI18nError(
  354. errors.New("access_secret cannot be empty with access_key not empty"),
  355. util.I18nErrorAccessSecretRequired,
  356. )
  357. }
  358. if c.AccessSecret.IsEncrypted() && !c.AccessSecret.IsValid() {
  359. return errors.New("invalid encrypted access_secret")
  360. }
  361. if !c.AccessSecret.IsEmpty() && !c.AccessSecret.IsValidInput() {
  362. return errors.New("invalid access_secret")
  363. }
  364. if c.SSECustomerKey.IsEncrypted() && !c.SSECustomerKey.IsValid() {
  365. return errors.New("invalid encrypted sse_customer_key")
  366. }
  367. if !c.SSECustomerKey.IsEmpty() && !c.SSECustomerKey.IsValidInput() {
  368. return errors.New("invalid sse_customer_key")
  369. }
  370. return nil
  371. }
  372. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  373. func (c *S3FsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  374. if err := c.validate(); err != nil {
  375. var errI18n *util.I18nError
  376. errValidation := util.NewValidationError(fmt.Sprintf("could not validate s3config: %v", err))
  377. if errors.As(err, &errI18n) {
  378. return util.NewI18nError(errValidation, errI18n.Message)
  379. }
  380. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  381. }
  382. if c.AccessSecret.IsPlain() {
  383. c.AccessSecret.SetAdditionalData(additionalData)
  384. err := c.AccessSecret.Encrypt()
  385. if err != nil {
  386. return util.NewI18nError(
  387. util.NewValidationError(fmt.Sprintf("could not encrypt s3 access secret: %v", err)),
  388. util.I18nErrorFsValidation,
  389. )
  390. }
  391. }
  392. if c.SSECustomerKey.IsPlain() {
  393. c.SSECustomerKey.SetAdditionalData(additionalData)
  394. err := c.SSECustomerKey.Encrypt()
  395. if err != nil {
  396. return util.NewI18nError(
  397. util.NewValidationError(fmt.Sprintf("could not encrypt s3 SSE customer key: %v", err)),
  398. util.I18nErrorFsValidation,
  399. )
  400. }
  401. }
  402. return nil
  403. }
  404. func (c *S3FsConfig) checkPartSizeAndConcurrency() error {
  405. if c.UploadPartSize != 0 && (c.UploadPartSize < 5 || c.UploadPartSize > 5000) {
  406. return util.NewI18nError(
  407. errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)"),
  408. util.I18nErrorULPartSizeInvalid,
  409. )
  410. }
  411. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  412. return util.NewI18nError(
  413. fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency),
  414. util.I18nErrorULConcurrencyInvalid,
  415. )
  416. }
  417. if c.DownloadPartSize != 0 && (c.DownloadPartSize < 5 || c.DownloadPartSize > 5000) {
  418. return util.NewI18nError(
  419. errors.New("download_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)"),
  420. util.I18nErrorDLPartSizeInvalid,
  421. )
  422. }
  423. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  424. return util.NewI18nError(
  425. fmt.Errorf("invalid download concurrency: %v", c.DownloadConcurrency),
  426. util.I18nErrorDLConcurrencyInvalid,
  427. )
  428. }
  429. return nil
  430. }
  431. func (c *S3FsConfig) isSameResource(other S3FsConfig) bool {
  432. if c.Bucket != other.Bucket {
  433. return false
  434. }
  435. if c.Endpoint != other.Endpoint {
  436. return false
  437. }
  438. return c.Region == other.Region
  439. }
  440. // validate returns an error if the configuration is not valid
  441. func (c *S3FsConfig) validate() error {
  442. if c.AccessSecret == nil {
  443. c.AccessSecret = kms.NewEmptySecret()
  444. }
  445. if c.SSECustomerKey == nil {
  446. c.SSECustomerKey = kms.NewEmptySecret()
  447. }
  448. if c.Bucket == "" {
  449. return util.NewI18nError(errors.New("bucket cannot be empty"), util.I18nErrorBucketRequired)
  450. }
  451. // the region may be embedded within the endpoint for some S3 compatible
  452. // object storage, for example B2
  453. if c.Endpoint == "" && c.Region == "" {
  454. return util.NewI18nError(errors.New("region cannot be empty"), util.I18nErrorRegionRequired)
  455. }
  456. if err := c.checkCredentials(); err != nil {
  457. return err
  458. }
  459. if c.KeyPrefix != "" {
  460. if strings.HasPrefix(c.KeyPrefix, "/") {
  461. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  462. }
  463. c.KeyPrefix = path.Clean(c.KeyPrefix)
  464. if !strings.HasSuffix(c.KeyPrefix, "/") {
  465. c.KeyPrefix += "/"
  466. }
  467. }
  468. c.StorageClass = strings.TrimSpace(c.StorageClass)
  469. c.ACL = strings.TrimSpace(c.ACL)
  470. return c.checkPartSizeAndConcurrency()
  471. }
  472. // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
  473. type GCSFsConfig struct {
  474. sdk.BaseGCSFsConfig
  475. Credentials *kms.Secret `json:"credentials,omitempty"`
  476. }
  477. // HideConfidentialData hides confidential data
  478. func (c *GCSFsConfig) HideConfidentialData() {
  479. if c.Credentials != nil {
  480. c.Credentials.Hide()
  481. }
  482. }
  483. // ValidateAndEncryptCredentials validates the configuration and encrypts credentials if they are in plain text
  484. func (c *GCSFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  485. if err := c.validate(); err != nil {
  486. var errI18n *util.I18nError
  487. errValidation := util.NewValidationError(fmt.Sprintf("could not validate GCS config: %v", err))
  488. if errors.As(err, &errI18n) {
  489. return util.NewI18nError(errValidation, errI18n.Message)
  490. }
  491. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  492. }
  493. if c.Credentials.IsPlain() {
  494. c.Credentials.SetAdditionalData(additionalData)
  495. err := c.Credentials.Encrypt()
  496. if err != nil {
  497. return util.NewI18nError(
  498. util.NewValidationError(fmt.Sprintf("could not encrypt GCS credentials: %v", err)),
  499. util.I18nErrorFsValidation,
  500. )
  501. }
  502. }
  503. return nil
  504. }
  505. func (c *GCSFsConfig) isEqual(other GCSFsConfig) bool {
  506. if c.Bucket != other.Bucket {
  507. return false
  508. }
  509. if c.KeyPrefix != other.KeyPrefix {
  510. return false
  511. }
  512. if c.AutomaticCredentials != other.AutomaticCredentials {
  513. return false
  514. }
  515. if c.StorageClass != other.StorageClass {
  516. return false
  517. }
  518. if c.ACL != other.ACL {
  519. return false
  520. }
  521. if c.UploadPartSize != other.UploadPartSize {
  522. return false
  523. }
  524. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  525. return false
  526. }
  527. if c.Credentials == nil {
  528. c.Credentials = kms.NewEmptySecret()
  529. }
  530. if other.Credentials == nil {
  531. other.Credentials = kms.NewEmptySecret()
  532. }
  533. return c.Credentials.IsEqual(other.Credentials)
  534. }
  535. func (c *GCSFsConfig) isSameResource(other GCSFsConfig) bool {
  536. return c.Bucket == other.Bucket
  537. }
  538. // validate returns an error if the configuration is not valid
  539. func (c *GCSFsConfig) validate() error {
  540. if c.Credentials == nil || c.AutomaticCredentials == 1 {
  541. c.Credentials = kms.NewEmptySecret()
  542. }
  543. if c.Bucket == "" {
  544. return util.NewI18nError(errors.New("bucket cannot be empty"), util.I18nErrorBucketRequired)
  545. }
  546. if c.KeyPrefix != "" {
  547. if strings.HasPrefix(c.KeyPrefix, "/") {
  548. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  549. }
  550. c.KeyPrefix = path.Clean(c.KeyPrefix)
  551. if !strings.HasSuffix(c.KeyPrefix, "/") {
  552. c.KeyPrefix += "/"
  553. }
  554. }
  555. if c.Credentials.IsEncrypted() && !c.Credentials.IsValid() {
  556. return errors.New("invalid encrypted credentials")
  557. }
  558. if c.AutomaticCredentials == 0 && !c.Credentials.IsValidInput() {
  559. return util.NewI18nError(errors.New("invalid credentials"), util.I18nErrorFsCredentialsRequired)
  560. }
  561. c.StorageClass = strings.TrimSpace(c.StorageClass)
  562. c.ACL = strings.TrimSpace(c.ACL)
  563. if c.UploadPartSize < 0 {
  564. c.UploadPartSize = 0
  565. }
  566. if c.UploadPartMaxTime < 0 {
  567. c.UploadPartMaxTime = 0
  568. }
  569. return nil
  570. }
  571. // AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
  572. type AzBlobFsConfig struct {
  573. sdk.BaseAzBlobFsConfig
  574. // Storage Account Key leave blank to use SAS URL.
  575. // The access key is stored encrypted based on the kms configuration
  576. AccountKey *kms.Secret `json:"account_key,omitempty"`
  577. // Shared access signature URL, leave blank if using account/key
  578. SASURL *kms.Secret `json:"sas_url,omitempty"`
  579. }
  580. // HideConfidentialData hides confidential data
  581. func (c *AzBlobFsConfig) HideConfidentialData() {
  582. if c.AccountKey != nil {
  583. c.AccountKey.Hide()
  584. }
  585. if c.SASURL != nil {
  586. c.SASURL.Hide()
  587. }
  588. }
  589. func (c *AzBlobFsConfig) isEqual(other AzBlobFsConfig) bool {
  590. if c.Container != other.Container {
  591. return false
  592. }
  593. if c.AccountName != other.AccountName {
  594. return false
  595. }
  596. if c.Endpoint != other.Endpoint {
  597. return false
  598. }
  599. if c.SASURL.IsEmpty() {
  600. c.SASURL = kms.NewEmptySecret()
  601. }
  602. if other.SASURL.IsEmpty() {
  603. other.SASURL = kms.NewEmptySecret()
  604. }
  605. if !c.SASURL.IsEqual(other.SASURL) {
  606. return false
  607. }
  608. if c.KeyPrefix != other.KeyPrefix {
  609. return false
  610. }
  611. if c.UploadPartSize != other.UploadPartSize {
  612. return false
  613. }
  614. if c.UploadConcurrency != other.UploadConcurrency {
  615. return false
  616. }
  617. if c.DownloadPartSize != other.DownloadPartSize {
  618. return false
  619. }
  620. if c.DownloadConcurrency != other.DownloadConcurrency {
  621. return false
  622. }
  623. if c.UseEmulator != other.UseEmulator {
  624. return false
  625. }
  626. if c.AccessTier != other.AccessTier {
  627. return false
  628. }
  629. return c.isSecretEqual(other)
  630. }
  631. func (c *AzBlobFsConfig) isSecretEqual(other AzBlobFsConfig) bool {
  632. if c.AccountKey == nil {
  633. c.AccountKey = kms.NewEmptySecret()
  634. }
  635. if other.AccountKey == nil {
  636. other.AccountKey = kms.NewEmptySecret()
  637. }
  638. return c.AccountKey.IsEqual(other.AccountKey)
  639. }
  640. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  641. func (c *AzBlobFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  642. if err := c.validate(); err != nil {
  643. var errI18n *util.I18nError
  644. errValidation := util.NewValidationError(fmt.Sprintf("could not validate Azure Blob config: %v", err))
  645. if errors.As(err, &errI18n) {
  646. return util.NewI18nError(errValidation, errI18n.Message)
  647. }
  648. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  649. }
  650. if c.AccountKey.IsPlain() {
  651. c.AccountKey.SetAdditionalData(additionalData)
  652. if err := c.AccountKey.Encrypt(); err != nil {
  653. return util.NewI18nError(
  654. util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob account key: %v", err)),
  655. util.I18nErrorFsValidation,
  656. )
  657. }
  658. }
  659. if c.SASURL.IsPlain() {
  660. c.SASURL.SetAdditionalData(additionalData)
  661. if err := c.SASURL.Encrypt(); err != nil {
  662. return util.NewI18nError(
  663. util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob SAS URL: %v", err)),
  664. util.I18nErrorFsValidation,
  665. )
  666. }
  667. }
  668. return nil
  669. }
  670. func (c *AzBlobFsConfig) checkCredentials() error {
  671. if c.SASURL.IsPlain() {
  672. _, err := url.Parse(c.SASURL.GetPayload())
  673. if err != nil {
  674. return util.NewI18nError(err, util.I18nErrorSASURLInvalid)
  675. }
  676. return nil
  677. }
  678. if c.SASURL.IsEncrypted() && !c.SASURL.IsValid() {
  679. return errors.New("invalid encrypted sas_url")
  680. }
  681. if !c.SASURL.IsEmpty() {
  682. return nil
  683. }
  684. if c.AccountName == "" {
  685. return util.NewI18nError(errors.New("account name is required"), util.I18nErrorAccountNameRequired)
  686. }
  687. if c.AccountKey.IsEncrypted() && !c.AccountKey.IsValid() {
  688. return errors.New("invalid encrypted account_key")
  689. }
  690. return nil
  691. }
  692. func (c *AzBlobFsConfig) checkPartSizeAndConcurrency() error {
  693. if c.UploadPartSize < 0 || c.UploadPartSize > 100 {
  694. return util.NewI18nError(
  695. fmt.Errorf("invalid upload part size: %v", c.UploadPartSize),
  696. util.I18nErrorULPartSizeInvalid,
  697. )
  698. }
  699. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  700. return util.NewI18nError(
  701. fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency),
  702. util.I18nErrorULConcurrencyInvalid,
  703. )
  704. }
  705. if c.DownloadPartSize < 0 || c.DownloadPartSize > 100 {
  706. return util.NewI18nError(
  707. fmt.Errorf("invalid download part size: %v", c.DownloadPartSize),
  708. util.I18nErrorDLPartSizeInvalid,
  709. )
  710. }
  711. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  712. return util.NewI18nError(
  713. fmt.Errorf("invalid upload concurrency: %v", c.DownloadConcurrency),
  714. util.I18nErrorDLConcurrencyInvalid,
  715. )
  716. }
  717. return nil
  718. }
  719. func (c *AzBlobFsConfig) tryDecrypt() error {
  720. if err := c.AccountKey.TryDecrypt(); err != nil {
  721. return fmt.Errorf("unable to decrypt account key: %w", err)
  722. }
  723. if err := c.SASURL.TryDecrypt(); err != nil {
  724. return fmt.Errorf("unable to decrypt SAS URL: %w", err)
  725. }
  726. return nil
  727. }
  728. func (c *AzBlobFsConfig) isSameResource(other AzBlobFsConfig) bool {
  729. if c.AccountName != other.AccountName {
  730. return false
  731. }
  732. if c.Endpoint != other.Endpoint {
  733. return false
  734. }
  735. return c.SASURL.GetPayload() == other.SASURL.GetPayload()
  736. }
  737. // validate returns an error if the configuration is not valid
  738. func (c *AzBlobFsConfig) validate() error {
  739. if c.AccountKey == nil {
  740. c.AccountKey = kms.NewEmptySecret()
  741. }
  742. if c.SASURL == nil {
  743. c.SASURL = kms.NewEmptySecret()
  744. }
  745. // container could be embedded within SAS URL we check this at runtime
  746. if c.SASURL.IsEmpty() && c.Container == "" {
  747. return util.NewI18nError(errors.New("container cannot be empty"), util.I18nErrorContainerRequired)
  748. }
  749. if err := c.checkCredentials(); err != nil {
  750. return err
  751. }
  752. if c.KeyPrefix != "" {
  753. if strings.HasPrefix(c.KeyPrefix, "/") {
  754. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  755. }
  756. c.KeyPrefix = path.Clean(c.KeyPrefix)
  757. if !strings.HasSuffix(c.KeyPrefix, "/") {
  758. c.KeyPrefix += "/"
  759. }
  760. }
  761. if err := c.checkPartSizeAndConcurrency(); err != nil {
  762. return err
  763. }
  764. if !slices.Contains(validAzAccessTier, c.AccessTier) {
  765. return fmt.Errorf("invalid access tier %q, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))
  766. }
  767. return nil
  768. }
  769. // CryptFsConfig defines the configuration to store local files as encrypted
  770. type CryptFsConfig struct {
  771. sdk.OSFsConfig
  772. Passphrase *kms.Secret `json:"passphrase,omitempty"`
  773. }
  774. // HideConfidentialData hides confidential data
  775. func (c *CryptFsConfig) HideConfidentialData() {
  776. if c.Passphrase != nil {
  777. c.Passphrase.Hide()
  778. }
  779. }
  780. func (c *CryptFsConfig) isEqual(other CryptFsConfig) bool {
  781. if c.Passphrase == nil {
  782. c.Passphrase = kms.NewEmptySecret()
  783. }
  784. if other.Passphrase == nil {
  785. other.Passphrase = kms.NewEmptySecret()
  786. }
  787. return c.Passphrase.IsEqual(other.Passphrase)
  788. }
  789. // ValidateAndEncryptCredentials validates the configuration and encrypts the passphrase if it is in plain text
  790. func (c *CryptFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  791. if err := c.validate(); err != nil {
  792. var errI18n *util.I18nError
  793. errValidation := util.NewValidationError(fmt.Sprintf("could not validate crypt fs config: %v", err))
  794. if errors.As(err, &errI18n) {
  795. return util.NewI18nError(errValidation, errI18n.Message)
  796. }
  797. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  798. }
  799. if c.Passphrase.IsPlain() {
  800. c.Passphrase.SetAdditionalData(additionalData)
  801. if err := c.Passphrase.Encrypt(); err != nil {
  802. return util.NewI18nError(
  803. util.NewValidationError(fmt.Sprintf("could not encrypt Crypt fs passphrase: %v", err)),
  804. util.I18nErrorFsValidation,
  805. )
  806. }
  807. }
  808. return nil
  809. }
  810. func (c *CryptFsConfig) isSameResource(other CryptFsConfig) bool {
  811. return c.Passphrase.GetPayload() == other.Passphrase.GetPayload()
  812. }
  813. // validate returns an error if the configuration is not valid
  814. func (c *CryptFsConfig) validate() error {
  815. if c.Passphrase == nil || c.Passphrase.IsEmpty() {
  816. return util.NewI18nError(errors.New("invalid passphrase"), util.I18nErrorPassphraseRequired)
  817. }
  818. if !c.Passphrase.IsValidInput() {
  819. return util.NewI18nError(errors.New("passphrase cannot be empty or invalid"), util.I18nErrorPassphraseRequired)
  820. }
  821. if c.Passphrase.IsEncrypted() && !c.Passphrase.IsValid() {
  822. return errors.New("invalid encrypted passphrase")
  823. }
  824. return nil
  825. }
  826. // pipeWriter defines a wrapper for a pipeWriterAt.
  827. type pipeWriter struct {
  828. pipeWriterAt
  829. err error
  830. done chan bool
  831. }
  832. // NewPipeWriter initializes a new PipeWriter
  833. func NewPipeWriter(w pipeWriterAt) PipeWriter {
  834. return &pipeWriter{
  835. pipeWriterAt: w,
  836. err: nil,
  837. done: make(chan bool),
  838. }
  839. }
  840. // Close waits for the upload to end, closes the pipeWriterAt and returns an error if any.
  841. func (p *pipeWriter) Close() error {
  842. p.pipeWriterAt.Close() //nolint:errcheck // the returned error is always null
  843. <-p.done
  844. return p.err
  845. }
  846. // Done unlocks other goroutines waiting on Close().
  847. // It must be called when the upload ends
  848. func (p *pipeWriter) Done(err error) {
  849. p.err = err
  850. p.done <- true
  851. }
  852. func newPipeWriterAtOffset(w pipeWriterAt, offset int64) PipeWriter {
  853. return &pipeWriterAtOffset{
  854. pipeWriter: &pipeWriter{
  855. pipeWriterAt: w,
  856. err: nil,
  857. done: make(chan bool),
  858. },
  859. offset: offset,
  860. writeOffset: offset,
  861. }
  862. }
  863. type pipeWriterAtOffset struct {
  864. *pipeWriter
  865. offset int64
  866. writeOffset int64
  867. }
  868. func (p *pipeWriterAtOffset) WriteAt(buf []byte, off int64) (int, error) {
  869. if off < p.offset {
  870. return 0, fmt.Errorf("invalid offset %d, minimum accepted %d", off, p.offset)
  871. }
  872. return p.pipeWriter.WriteAt(buf, off-p.offset)
  873. }
  874. func (p *pipeWriterAtOffset) Write(buf []byte) (int, error) {
  875. n, err := p.WriteAt(buf, p.writeOffset)
  876. p.writeOffset += int64(n)
  877. return n, err
  878. }
  879. // NewPipeReader initializes a new PipeReader
  880. func NewPipeReader(r pipeReaderAt) PipeReader {
  881. return &pipeReader{
  882. pipeReaderAt: r,
  883. }
  884. }
  885. // pipeReader defines a wrapper for pipeat.PipeReaderAt.
  886. type pipeReader struct {
  887. pipeReaderAt
  888. mu sync.RWMutex
  889. metadata map[string]string
  890. }
  891. func (p *pipeReader) setMetadata(value map[string]string) {
  892. p.mu.Lock()
  893. defer p.mu.Unlock()
  894. p.metadata = value
  895. }
  896. func (p *pipeReader) setMetadataFromPointerVal(value map[string]*string) {
  897. p.mu.Lock()
  898. defer p.mu.Unlock()
  899. if len(value) == 0 {
  900. p.metadata = nil
  901. return
  902. }
  903. p.metadata = map[string]string{}
  904. for k, v := range value {
  905. val := util.GetStringFromPointer(v)
  906. if val != "" {
  907. p.metadata[k] = val
  908. }
  909. }
  910. }
  911. // Metadata implements the Metadater interface
  912. func (p *pipeReader) Metadata() map[string]string {
  913. p.mu.RLock()
  914. defer p.mu.RUnlock()
  915. if len(p.metadata) == 0 {
  916. return nil
  917. }
  918. result := make(map[string]string)
  919. for k, v := range p.metadata {
  920. result[k] = v
  921. }
  922. return result
  923. }
  924. func isEqualityCheckModeValid(mode int) bool {
  925. return mode >= 0 || mode <= 1
  926. }
  927. // isDirectory checks if a path exists and is a directory
  928. func isDirectory(fs Fs, path string) (bool, error) {
  929. fileInfo, err := fs.Stat(path)
  930. if err != nil {
  931. return false, err
  932. }
  933. return fileInfo.IsDir(), err
  934. }
  935. // IsLocalOsFs returns true if fs is a local filesystem implementation
  936. func IsLocalOsFs(fs Fs) bool {
  937. return fs.Name() == osFsName
  938. }
  939. // IsCryptOsFs returns true if fs is an encrypted local filesystem implementation
  940. func IsCryptOsFs(fs Fs) bool {
  941. return fs.Name() == cryptFsName
  942. }
  943. // IsSFTPFs returns true if fs is an SFTP filesystem
  944. func IsSFTPFs(fs Fs) bool {
  945. return strings.HasPrefix(fs.Name(), sftpFsName)
  946. }
  947. // IsHTTPFs returns true if fs is an HTTP filesystem
  948. func IsHTTPFs(fs Fs) bool {
  949. return strings.HasPrefix(fs.Name(), httpFsName)
  950. }
  951. // IsBufferedLocalOrSFTPFs returns true if this is a buffered SFTP or local filesystem
  952. func IsBufferedLocalOrSFTPFs(fs Fs) bool {
  953. if osFs, ok := fs.(*OsFs); ok {
  954. return osFs.writeBufferSize > 0
  955. }
  956. if !IsSFTPFs(fs) {
  957. return false
  958. }
  959. return !fs.IsUploadResumeSupported()
  960. }
  961. // FsOpenReturnsFile returns true if fs.Open returns a *os.File handle
  962. func FsOpenReturnsFile(fs Fs) bool {
  963. if osFs, ok := fs.(*OsFs); ok {
  964. return osFs.readBufferSize == 0
  965. }
  966. if sftpFs, ok := fs.(*SFTPFs); ok {
  967. return sftpFs.config.BufferSize == 0
  968. }
  969. return false
  970. }
  971. // IsLocalOrSFTPFs returns true if fs is local or SFTP
  972. func IsLocalOrSFTPFs(fs Fs) bool {
  973. return IsLocalOsFs(fs) || IsSFTPFs(fs)
  974. }
  975. // HasTruncateSupport returns true if the fs supports truncate files
  976. func HasTruncateSupport(fs Fs) bool {
  977. return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
  978. }
  979. // IsRenameAtomic returns true if renaming a directory is supposed to be atomic
  980. func IsRenameAtomic(fs Fs) bool {
  981. if strings.HasPrefix(fs.Name(), s3fsName) {
  982. return false
  983. }
  984. if strings.HasPrefix(fs.Name(), gcsfsName) {
  985. return false
  986. }
  987. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  988. return false
  989. }
  990. return true
  991. }
  992. // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
  993. func HasImplicitAtomicUploads(fs Fs) bool {
  994. if strings.HasPrefix(fs.Name(), s3fsName) {
  995. return uploadMode&4 == 0
  996. }
  997. if strings.HasPrefix(fs.Name(), gcsfsName) {
  998. return uploadMode&8 == 0
  999. }
  1000. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  1001. return uploadMode&16 == 0
  1002. }
  1003. return false
  1004. }
  1005. // HasOpenRWSupport returns true if the fs can open a file
  1006. // for reading and writing at the same time
  1007. func HasOpenRWSupport(fs Fs) bool {
  1008. if IsLocalOsFs(fs) {
  1009. return true
  1010. }
  1011. if IsSFTPFs(fs) && fs.IsUploadResumeSupported() {
  1012. return true
  1013. }
  1014. return false
  1015. }
  1016. // IsLocalOrCryptoFs returns true if fs is local or local encrypted
  1017. func IsLocalOrCryptoFs(fs Fs) bool {
  1018. return IsLocalOsFs(fs) || IsCryptOsFs(fs)
  1019. }
  1020. // SetPathPermissions calls fs.Chown.
  1021. // It does nothing for local filesystem on windows
  1022. func SetPathPermissions(fs Fs, path string, uid int, gid int) {
  1023. if uid == -1 && gid == -1 {
  1024. return
  1025. }
  1026. if IsLocalOsFs(fs) {
  1027. if runtime.GOOS == "windows" {
  1028. return
  1029. }
  1030. }
  1031. if err := fs.Chown(path, uid, gid); err != nil {
  1032. fsLog(fs, logger.LevelWarn, "error chowning path %v: %v", path, err)
  1033. }
  1034. }
  1035. // IsUploadResumeSupported returns true if resuming uploads is supported
  1036. func IsUploadResumeSupported(fs Fs, size int64) bool {
  1037. if fs.IsUploadResumeSupported() {
  1038. return true
  1039. }
  1040. return fs.IsConditionalUploadResumeSupported(size)
  1041. }
  1042. func getLastModified(metadata map[string]string) int64 {
  1043. if val, ok := metadata[lastModifiedField]; ok && val != "" {
  1044. lastModified, err := strconv.ParseInt(val, 10, 64)
  1045. if err == nil {
  1046. return lastModified
  1047. }
  1048. }
  1049. return 0
  1050. }
  1051. func getAzureLastModified(metadata map[string]*string) int64 {
  1052. for k, v := range metadata {
  1053. if strings.EqualFold(k, lastModifiedField) {
  1054. if val := util.GetStringFromPointer(v); val != "" {
  1055. lastModified, err := strconv.ParseInt(val, 10, 64)
  1056. if err == nil {
  1057. return lastModified
  1058. }
  1059. }
  1060. return 0
  1061. }
  1062. }
  1063. return 0
  1064. }
  1065. func validateOSFsConfig(config *sdk.OSFsConfig) error {
  1066. if config.ReadBufferSize < 0 || config.ReadBufferSize > 10 {
  1067. return fmt.Errorf("invalid read buffer size must be between 0 and 10 MB")
  1068. }
  1069. if config.WriteBufferSize < 0 || config.WriteBufferSize > 10 {
  1070. return fmt.Errorf("invalid write buffer size must be between 0 and 10 MB")
  1071. }
  1072. return nil
  1073. }
  1074. func doCopy(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
  1075. if buf == nil {
  1076. buf = make([]byte, 32768)
  1077. }
  1078. for {
  1079. nr, er := src.Read(buf)
  1080. if nr > 0 {
  1081. nw, ew := dst.Write(buf[0:nr])
  1082. if nw < 0 || nr < nw {
  1083. nw = 0
  1084. if ew == nil {
  1085. ew = errors.New("invalid write")
  1086. }
  1087. }
  1088. written += int64(nw)
  1089. if ew != nil {
  1090. err = ew
  1091. break
  1092. }
  1093. if nr != nw {
  1094. err = io.ErrShortWrite
  1095. break
  1096. }
  1097. }
  1098. if er != nil {
  1099. if er != io.EOF {
  1100. err = er
  1101. }
  1102. break
  1103. }
  1104. }
  1105. return written, err
  1106. }
  1107. func getMountPath(mountPath string) string {
  1108. if mountPath == "/" {
  1109. return ""
  1110. }
  1111. return mountPath
  1112. }
  1113. func getLocalTempDir() string {
  1114. if tempPath != "" {
  1115. return tempPath
  1116. }
  1117. return filepath.Clean(os.TempDir())
  1118. }
  1119. func doRecursiveRename(fs Fs, source, target string,
  1120. renameFn func(string, string, os.FileInfo, int, bool) (int, int64, error),
  1121. recursion int, updateModTime bool,
  1122. ) (int, int64, error) {
  1123. var numFiles int
  1124. var filesSize int64
  1125. if recursion > util.MaxRecursion {
  1126. return numFiles, filesSize, util.ErrRecursionTooDeep
  1127. }
  1128. recursion++
  1129. lister, err := fs.ReadDir(source)
  1130. if err != nil {
  1131. return numFiles, filesSize, err
  1132. }
  1133. defer lister.Close()
  1134. for {
  1135. entries, err := lister.Next(ListerBatchSize)
  1136. finished := errors.Is(err, io.EOF)
  1137. if err != nil && !finished {
  1138. return numFiles, filesSize, err
  1139. }
  1140. for _, info := range entries {
  1141. sourceEntry := fs.Join(source, info.Name())
  1142. targetEntry := fs.Join(target, info.Name())
  1143. files, size, err := renameFn(sourceEntry, targetEntry, info, recursion, updateModTime)
  1144. if err != nil {
  1145. if fs.IsNotExist(err) {
  1146. fsLog(fs, logger.LevelInfo, "skipping rename for %q: %v", sourceEntry, err)
  1147. continue
  1148. }
  1149. return numFiles, filesSize, err
  1150. }
  1151. numFiles += files
  1152. filesSize += size
  1153. }
  1154. if finished {
  1155. return numFiles, filesSize, nil
  1156. }
  1157. }
  1158. }
  1159. func fsLog(fs Fs, level logger.LogLevel, format string, v ...any) {
  1160. logger.Log(level, fs.Name(), fs.ConnectionID(), format, v...)
  1161. }