sftpfs.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. package vfs
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/eikenb/pipeat"
  15. "github.com/pkg/sftp"
  16. "github.com/rs/xid"
  17. "golang.org/x/crypto/ssh"
  18. "github.com/drakkan/sftpgo/kms"
  19. "github.com/drakkan/sftpgo/logger"
  20. "github.com/drakkan/sftpgo/utils"
  21. "github.com/drakkan/sftpgo/version"
  22. )
  23. const (
  24. // osFsName is the name for the local Fs implementation
  25. sftpFsName = "sftpfs"
  26. )
  27. // SFTPFsConfig defines the configuration for SFTP based filesystem
  28. type SFTPFsConfig struct {
  29. Endpoint string `json:"endpoint,omitempty"`
  30. Username string `json:"username,omitempty"`
  31. Password *kms.Secret `json:"password,omitempty"`
  32. PrivateKey *kms.Secret `json:"private_key,omitempty"`
  33. Fingerprints []string `json:"fingerprints,omitempty"`
  34. // Prefix is the path prefix to strip from SFTP resource paths.
  35. Prefix string `json:"prefix,omitempty"`
  36. // Concurrent reads are safe to use and disabling them will degrade performance.
  37. // Some servers automatically delete files once they are downloaded.
  38. // Using concurrent reads is problematic with such servers.
  39. DisableCouncurrentReads bool `json:"disable_concurrent_reads,omitempty"`
  40. }
  41. func (c *SFTPFsConfig) isEqual(other *SFTPFsConfig) bool {
  42. if c.Endpoint != other.Endpoint {
  43. return false
  44. }
  45. if c.Username != other.Username {
  46. return false
  47. }
  48. if c.Prefix != other.Prefix {
  49. return false
  50. }
  51. if c.DisableCouncurrentReads != other.DisableCouncurrentReads {
  52. return false
  53. }
  54. if len(c.Fingerprints) != len(other.Fingerprints) {
  55. return false
  56. }
  57. for _, fp := range c.Fingerprints {
  58. if !utils.IsStringInSlice(fp, other.Fingerprints) {
  59. return false
  60. }
  61. }
  62. c.setEmptyCredentialsIfNil()
  63. other.setEmptyCredentialsIfNil()
  64. if !c.Password.IsEqual(other.Password) {
  65. return false
  66. }
  67. return c.PrivateKey.IsEqual(other.PrivateKey)
  68. }
  69. func (c *SFTPFsConfig) setEmptyCredentialsIfNil() {
  70. if c.Password == nil {
  71. c.Password = kms.NewEmptySecret()
  72. }
  73. if c.PrivateKey == nil {
  74. c.PrivateKey = kms.NewEmptySecret()
  75. }
  76. }
  77. // Validate returns an error if the configuration is not valid
  78. func (c *SFTPFsConfig) Validate() error {
  79. c.setEmptyCredentialsIfNil()
  80. if c.Endpoint == "" {
  81. return errors.New("endpoint cannot be empty")
  82. }
  83. _, _, err := net.SplitHostPort(c.Endpoint)
  84. if err != nil {
  85. return fmt.Errorf("invalid endpoint: %v", err)
  86. }
  87. if c.Username == "" {
  88. return errors.New("username cannot be empty")
  89. }
  90. if c.Password.IsEmpty() && c.PrivateKey.IsEmpty() {
  91. return errors.New("credentials cannot be empty")
  92. }
  93. if c.Password.IsEncrypted() && !c.Password.IsValid() {
  94. return errors.New("invalid encrypted password")
  95. }
  96. if !c.Password.IsEmpty() && !c.Password.IsValidInput() {
  97. return errors.New("invalid password")
  98. }
  99. if c.PrivateKey.IsEncrypted() && !c.PrivateKey.IsValid() {
  100. return errors.New("invalid encrypted private key")
  101. }
  102. if !c.PrivateKey.IsEmpty() && !c.PrivateKey.IsValidInput() {
  103. return errors.New("invalid private key")
  104. }
  105. if c.Prefix != "" {
  106. c.Prefix = utils.CleanPath(c.Prefix)
  107. } else {
  108. c.Prefix = "/"
  109. }
  110. return nil
  111. }
  112. // EncryptCredentials encrypts password and/or private key if they are in plain text
  113. func (c *SFTPFsConfig) EncryptCredentials(additionalData string) error {
  114. if c.Password.IsPlain() {
  115. c.Password.SetAdditionalData(additionalData)
  116. if err := c.Password.Encrypt(); err != nil {
  117. return err
  118. }
  119. }
  120. if c.PrivateKey.IsPlain() {
  121. c.PrivateKey.SetAdditionalData(additionalData)
  122. if err := c.PrivateKey.Encrypt(); err != nil {
  123. return err
  124. }
  125. }
  126. return nil
  127. }
  128. // SFTPFs is a Fs implementation for SFTP backends
  129. type SFTPFs struct {
  130. sync.Mutex
  131. connectionID string
  132. // if not empty this fs is mouted as virtual folder in the specified path
  133. mountPath string
  134. config *SFTPFsConfig
  135. sshClient *ssh.Client
  136. sftpClient *sftp.Client
  137. err chan error
  138. }
  139. // NewSFTPFs returns an SFTPFa object that allows to interact with an SFTP server
  140. func NewSFTPFs(connectionID, mountPath string, config SFTPFsConfig) (Fs, error) {
  141. if err := config.Validate(); err != nil {
  142. return nil, err
  143. }
  144. if !config.Password.IsEmpty() {
  145. if err := config.Password.TryDecrypt(); err != nil {
  146. return nil, err
  147. }
  148. }
  149. if !config.PrivateKey.IsEmpty() {
  150. if err := config.PrivateKey.TryDecrypt(); err != nil {
  151. return nil, err
  152. }
  153. }
  154. sftpFs := &SFTPFs{
  155. connectionID: connectionID,
  156. mountPath: mountPath,
  157. config: &config,
  158. err: make(chan error, 1),
  159. }
  160. err := sftpFs.createConnection()
  161. return sftpFs, err
  162. }
  163. // Name returns the name for the Fs implementation
  164. func (fs *SFTPFs) Name() string {
  165. return fmt.Sprintf("%v %#v", sftpFsName, fs.config.Endpoint)
  166. }
  167. // ConnectionID returns the connection ID associated to this Fs implementation
  168. func (fs *SFTPFs) ConnectionID() string {
  169. return fs.connectionID
  170. }
  171. // Stat returns a FileInfo describing the named file
  172. func (fs *SFTPFs) Stat(name string) (os.FileInfo, error) {
  173. if err := fs.checkConnection(); err != nil {
  174. return nil, err
  175. }
  176. info, err := fs.sftpClient.Stat(name)
  177. if err != nil {
  178. return nil, err
  179. }
  180. fi := NewFileInfo(info.Name(), info.IsDir(), info.Size(), info.ModTime(), false)
  181. fi.SetMode(info.Mode())
  182. return fi, nil
  183. }
  184. // Lstat returns a FileInfo describing the named file
  185. func (fs *SFTPFs) Lstat(name string) (os.FileInfo, error) {
  186. if err := fs.checkConnection(); err != nil {
  187. return nil, err
  188. }
  189. info, err := fs.sftpClient.Lstat(name)
  190. if err != nil {
  191. return nil, err
  192. }
  193. fi := NewFileInfo(info.Name(), info.IsDir(), info.Size(), info.ModTime(), false)
  194. fi.SetMode(info.Mode())
  195. return fi, nil
  196. }
  197. // Open opens the named file for reading
  198. func (fs *SFTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  199. if err := fs.checkConnection(); err != nil {
  200. return nil, nil, nil, err
  201. }
  202. f, err := fs.sftpClient.Open(name)
  203. return f, nil, nil, err
  204. }
  205. // Create creates or opens the named file for writing
  206. func (fs *SFTPFs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  207. err := fs.checkConnection()
  208. if err != nil {
  209. return nil, nil, nil, err
  210. }
  211. var f File
  212. if flag == 0 {
  213. f, err = fs.sftpClient.Create(name)
  214. } else {
  215. f, err = fs.sftpClient.OpenFile(name, flag)
  216. }
  217. return f, nil, nil, err
  218. }
  219. // Rename renames (moves) source to target.
  220. func (fs *SFTPFs) Rename(source, target string) error {
  221. if err := fs.checkConnection(); err != nil {
  222. return err
  223. }
  224. return fs.sftpClient.Rename(source, target)
  225. }
  226. // Remove removes the named file or (empty) directory.
  227. func (fs *SFTPFs) Remove(name string, isDir bool) error {
  228. if err := fs.checkConnection(); err != nil {
  229. return err
  230. }
  231. return fs.sftpClient.Remove(name)
  232. }
  233. // Mkdir creates a new directory with the specified name and default permissions
  234. func (fs *SFTPFs) Mkdir(name string) error {
  235. if err := fs.checkConnection(); err != nil {
  236. return err
  237. }
  238. return fs.sftpClient.Mkdir(name)
  239. }
  240. // MkdirAll creates a directory named path, along with any necessary parents,
  241. // and returns nil, or else returns an error.
  242. // If path is already a directory, MkdirAll does nothing and returns nil.
  243. func (fs *SFTPFs) MkdirAll(name string, uid int, gid int) error {
  244. if err := fs.checkConnection(); err != nil {
  245. return err
  246. }
  247. return fs.sftpClient.MkdirAll(name)
  248. }
  249. // Symlink creates source as a symbolic link to target.
  250. func (fs *SFTPFs) Symlink(source, target string) error {
  251. if err := fs.checkConnection(); err != nil {
  252. return err
  253. }
  254. return fs.sftpClient.Symlink(source, target)
  255. }
  256. // Readlink returns the destination of the named symbolic link
  257. func (fs *SFTPFs) Readlink(name string) (string, error) {
  258. if err := fs.checkConnection(); err != nil {
  259. return "", err
  260. }
  261. return fs.sftpClient.ReadLink(name)
  262. }
  263. // Chown changes the numeric uid and gid of the named file.
  264. func (fs *SFTPFs) Chown(name string, uid int, gid int) error {
  265. if err := fs.checkConnection(); err != nil {
  266. return err
  267. }
  268. return fs.sftpClient.Chown(name, uid, gid)
  269. }
  270. // Chmod changes the mode of the named file to mode.
  271. func (fs *SFTPFs) Chmod(name string, mode os.FileMode) error {
  272. if err := fs.checkConnection(); err != nil {
  273. return err
  274. }
  275. return fs.sftpClient.Chmod(name, mode)
  276. }
  277. // Chtimes changes the access and modification times of the named file.
  278. func (fs *SFTPFs) Chtimes(name string, atime, mtime time.Time) error {
  279. if err := fs.checkConnection(); err != nil {
  280. return err
  281. }
  282. return fs.sftpClient.Chtimes(name, atime, mtime)
  283. }
  284. // Truncate changes the size of the named file.
  285. func (fs *SFTPFs) Truncate(name string, size int64) error {
  286. if err := fs.checkConnection(); err != nil {
  287. return err
  288. }
  289. return fs.sftpClient.Truncate(name, size)
  290. }
  291. // ReadDir reads the directory named by dirname and returns
  292. // a list of directory entries.
  293. func (fs *SFTPFs) ReadDir(dirname string) ([]os.FileInfo, error) {
  294. if err := fs.checkConnection(); err != nil {
  295. return nil, err
  296. }
  297. entries, err := fs.sftpClient.ReadDir(dirname)
  298. if err != nil {
  299. return nil, err
  300. }
  301. result := make([]os.FileInfo, 0, len(entries))
  302. for _, entry := range entries {
  303. info := NewFileInfo(entry.Name(), entry.IsDir(), entry.Size(), entry.ModTime(), false)
  304. info.SetMode(entry.Mode())
  305. result = append(result, info)
  306. }
  307. return result, nil
  308. }
  309. // IsUploadResumeSupported returns true if upload resume is supported.
  310. func (*SFTPFs) IsUploadResumeSupported() bool {
  311. return true
  312. }
  313. // IsAtomicUploadSupported returns true if atomic upload is supported.
  314. func (*SFTPFs) IsAtomicUploadSupported() bool {
  315. return true
  316. }
  317. // IsNotExist returns a boolean indicating whether the error is known to
  318. // report that a file or directory does not exist
  319. func (*SFTPFs) IsNotExist(err error) bool {
  320. return os.IsNotExist(err)
  321. }
  322. // IsPermission returns a boolean indicating whether the error is known to
  323. // report that permission is denied.
  324. func (*SFTPFs) IsPermission(err error) bool {
  325. return os.IsPermission(err)
  326. }
  327. // IsNotSupported returns true if the error indicate an unsupported operation
  328. func (*SFTPFs) IsNotSupported(err error) bool {
  329. if err == nil {
  330. return false
  331. }
  332. return err == ErrVfsUnsupported
  333. }
  334. // CheckRootPath creates the specified local root directory if it does not exists
  335. func (fs *SFTPFs) CheckRootPath(username string, uid int, gid int) bool {
  336. if fs.config.Prefix == "/" {
  337. return true
  338. }
  339. if err := fs.MkdirAll(fs.config.Prefix, uid, gid); err != nil {
  340. fsLog(fs, logger.LevelDebug, "error creating root directory %#v for user %#v: %v", fs.config.Prefix, username, err)
  341. return false
  342. }
  343. return true
  344. }
  345. // ScanRootDirContents returns the number of files contained in a directory and
  346. // their size
  347. func (fs *SFTPFs) ScanRootDirContents() (int, int64, error) {
  348. return fs.GetDirSize(fs.config.Prefix)
  349. }
  350. // GetAtomicUploadPath returns the path to use for an atomic upload
  351. func (*SFTPFs) GetAtomicUploadPath(name string) string {
  352. dir := path.Dir(name)
  353. guid := xid.New().String()
  354. return path.Join(dir, ".sftpgo-upload."+guid+"."+path.Base(name))
  355. }
  356. // GetRelativePath returns the path for a file relative to the sftp prefix if any.
  357. // This is the path as seen by SFTPGo users
  358. func (fs *SFTPFs) GetRelativePath(name string) string {
  359. rel := path.Clean(name)
  360. if rel == "." {
  361. rel = ""
  362. }
  363. if !path.IsAbs(rel) {
  364. return "/" + rel
  365. }
  366. if fs.config.Prefix != "/" {
  367. if !strings.HasPrefix(rel, fs.config.Prefix) {
  368. rel = "/"
  369. }
  370. rel = path.Clean("/" + strings.TrimPrefix(rel, fs.config.Prefix))
  371. }
  372. if fs.mountPath != "" {
  373. rel = path.Join(fs.mountPath, rel)
  374. }
  375. return rel
  376. }
  377. // Walk walks the file tree rooted at root, calling walkFn for each file or
  378. // directory in the tree, including root
  379. func (fs *SFTPFs) Walk(root string, walkFn filepath.WalkFunc) error {
  380. if err := fs.checkConnection(); err != nil {
  381. return err
  382. }
  383. walker := fs.sftpClient.Walk(root)
  384. for walker.Step() {
  385. err := walker.Err()
  386. if err != nil {
  387. return err
  388. }
  389. err = walkFn(walker.Path(), walker.Stat(), err)
  390. if err != nil {
  391. return err
  392. }
  393. }
  394. return nil
  395. }
  396. // Join joins any number of path elements into a single path
  397. func (*SFTPFs) Join(elem ...string) string {
  398. return path.Join(elem...)
  399. }
  400. // HasVirtualFolders returns true if folders are emulated
  401. func (*SFTPFs) HasVirtualFolders() bool {
  402. return false
  403. }
  404. // ResolvePath returns the matching filesystem path for the specified virtual path
  405. func (fs *SFTPFs) ResolvePath(virtualPath string) (string, error) {
  406. if fs.mountPath != "" {
  407. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  408. }
  409. if !path.IsAbs(virtualPath) {
  410. virtualPath = path.Clean("/" + virtualPath)
  411. }
  412. fsPath := fs.Join(fs.config.Prefix, virtualPath)
  413. if fs.config.Prefix != "/" && fsPath != "/" {
  414. // we need to check if this path is a symlink outside the given prefix
  415. // or a file/dir inside a dir symlinked outside the prefix
  416. if err := fs.checkConnection(); err != nil {
  417. return "", err
  418. }
  419. var validatedPath string
  420. var err error
  421. validatedPath, err = fs.getRealPath(fsPath)
  422. if err != nil && !os.IsNotExist(err) {
  423. fsLog(fs, logger.LevelWarn, "Invalid path resolution, original path %v resolved %#v err: %v",
  424. virtualPath, fsPath, err)
  425. return "", err
  426. } else if os.IsNotExist(err) {
  427. for os.IsNotExist(err) {
  428. validatedPath = path.Dir(validatedPath)
  429. if validatedPath == "/" {
  430. err = nil
  431. break
  432. }
  433. validatedPath, err = fs.getRealPath(validatedPath)
  434. }
  435. if err != nil {
  436. fsLog(fs, logger.LevelWarn, "Invalid path resolution, dir %#v original path %#v resolved %#v err: %v",
  437. validatedPath, virtualPath, fsPath, err)
  438. return "", err
  439. }
  440. }
  441. if err := fs.isSubDir(validatedPath); err != nil {
  442. fsLog(fs, logger.LevelWarn, "Invalid path resolution, dir %#v original path %#v resolved %#v err: %v",
  443. validatedPath, virtualPath, fsPath, err)
  444. return "", err
  445. }
  446. }
  447. return fsPath, nil
  448. }
  449. // getRealPath returns the real remote path trying to resolve symbolic links if any
  450. func (fs *SFTPFs) getRealPath(name string) (string, error) {
  451. info, err := fs.sftpClient.Lstat(name)
  452. if err != nil {
  453. return name, err
  454. }
  455. if info.Mode()&os.ModeSymlink != 0 {
  456. return fs.sftpClient.ReadLink(name)
  457. }
  458. return name, err
  459. }
  460. func (fs *SFTPFs) isSubDir(name string) error {
  461. if name == fs.config.Prefix {
  462. return nil
  463. }
  464. if len(name) < len(fs.config.Prefix) {
  465. err := fmt.Errorf("path %#v is not inside: %#v", name, fs.config.Prefix)
  466. return err
  467. }
  468. if !strings.HasPrefix(name, fs.config.Prefix+"/") {
  469. err := fmt.Errorf("path %#v is not inside: %#v", name, fs.config.Prefix)
  470. return err
  471. }
  472. return nil
  473. }
  474. // GetDirSize returns the number of files and the size for a folder
  475. // including any subfolders
  476. func (fs *SFTPFs) GetDirSize(dirname string) (int, int64, error) {
  477. numFiles := 0
  478. size := int64(0)
  479. if err := fs.checkConnection(); err != nil {
  480. return numFiles, size, err
  481. }
  482. isDir, err := IsDirectory(fs, dirname)
  483. if err == nil && isDir {
  484. walker := fs.sftpClient.Walk(dirname)
  485. for walker.Step() {
  486. err := walker.Err()
  487. if err != nil {
  488. return numFiles, size, err
  489. }
  490. if walker.Stat().Mode().IsRegular() {
  491. size += walker.Stat().Size()
  492. numFiles++
  493. }
  494. }
  495. }
  496. return numFiles, size, err
  497. }
  498. // GetMimeType returns the content type
  499. func (fs *SFTPFs) GetMimeType(name string) (string, error) {
  500. if err := fs.checkConnection(); err != nil {
  501. return "", err
  502. }
  503. f, err := fs.sftpClient.OpenFile(name, os.O_RDONLY)
  504. if err != nil {
  505. return "", err
  506. }
  507. defer f.Close()
  508. var buf [512]byte
  509. n, err := io.ReadFull(f, buf[:])
  510. if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
  511. return "", err
  512. }
  513. ctype := http.DetectContentType(buf[:n])
  514. // Rewind file.
  515. _, err = f.Seek(0, io.SeekStart)
  516. return ctype, err
  517. }
  518. // GetAvailableDiskSize return the available size for the specified path
  519. func (fs *SFTPFs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  520. if err := fs.checkConnection(); err != nil {
  521. return nil, err
  522. }
  523. if _, ok := fs.sftpClient.HasExtension("[email protected]"); !ok {
  524. return nil, ErrStorageSizeUnavailable
  525. }
  526. return fs.sftpClient.StatVFS(dirName)
  527. }
  528. // Close the connection
  529. func (fs *SFTPFs) Close() error {
  530. fs.Lock()
  531. defer fs.Unlock()
  532. var sftpErr, sshErr error
  533. if fs.sftpClient != nil {
  534. sftpErr = fs.sftpClient.Close()
  535. }
  536. if fs.sshClient != nil {
  537. sshErr = fs.sshClient.Close()
  538. }
  539. if sftpErr != nil {
  540. return sftpErr
  541. }
  542. return sshErr
  543. }
  544. func (fs *SFTPFs) checkConnection() error {
  545. err := fs.closed()
  546. if err == nil {
  547. return nil
  548. }
  549. return fs.createConnection()
  550. }
  551. func (fs *SFTPFs) createConnection() error {
  552. fs.Lock()
  553. defer fs.Unlock()
  554. var err error
  555. clientConfig := &ssh.ClientConfig{
  556. User: fs.config.Username,
  557. HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
  558. if len(fs.config.Fingerprints) > 0 {
  559. fp := ssh.FingerprintSHA256(key)
  560. for _, provided := range fs.config.Fingerprints {
  561. if provided == fp {
  562. return nil
  563. }
  564. }
  565. return fmt.Errorf("invalid fingerprint %#v", fp)
  566. }
  567. fsLog(fs, logger.LevelWarn, "login without host key validation, please provide at least a fingerprint!")
  568. return nil
  569. },
  570. ClientVersion: fmt.Sprintf("SSH-2.0-SFTPGo_%v", version.Get().Version),
  571. }
  572. if fs.config.PrivateKey.GetPayload() != "" {
  573. signer, err := ssh.ParsePrivateKey([]byte(fs.config.PrivateKey.GetPayload()))
  574. if err != nil {
  575. fs.err <- err
  576. return err
  577. }
  578. clientConfig.Auth = append(clientConfig.Auth, ssh.PublicKeys(signer))
  579. }
  580. if fs.config.Password.GetPayload() != "" {
  581. clientConfig.Auth = append(clientConfig.Auth, ssh.Password(fs.config.Password.GetPayload()))
  582. }
  583. fs.sshClient, err = ssh.Dial("tcp", fs.config.Endpoint, clientConfig)
  584. if err != nil {
  585. fs.err <- err
  586. return err
  587. }
  588. fs.sftpClient, err = sftp.NewClient(fs.sshClient)
  589. if err != nil {
  590. fs.sshClient.Close()
  591. fs.err <- err
  592. return err
  593. }
  594. if fs.config.DisableCouncurrentReads {
  595. fsLog(fs, logger.LevelDebug, "disabling concurrent reads")
  596. opt := sftp.UseConcurrentReads(false)
  597. opt(fs.sftpClient) //nolint:errcheck
  598. }
  599. go fs.wait()
  600. return nil
  601. }
  602. func (fs *SFTPFs) wait() {
  603. // we wait on the sftp client otherwise if the channel is closed but not the connection
  604. // we don't detect the event.
  605. fs.err <- fs.sftpClient.Wait()
  606. fsLog(fs, logger.LevelDebug, "sftp channel closed")
  607. fs.Lock()
  608. defer fs.Unlock()
  609. if fs.sshClient != nil {
  610. fs.sshClient.Close()
  611. }
  612. }
  613. func (fs *SFTPFs) closed() error {
  614. select {
  615. case err := <-fs.err:
  616. return err
  617. default:
  618. return nil
  619. }
  620. }