sftpd.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. // Package sftpd implements the SSH File Transfer Protocol as described in https://tools.ietf.org/html/draft-ietf-secsh-filexfer-02.
  2. // It uses pkg/sftp library:
  3. // https://github.com/pkg/sftp
  4. package sftpd
  5. import (
  6. "bytes"
  7. "context"
  8. "encoding/json"
  9. "fmt"
  10. "net/url"
  11. "os"
  12. "os/exec"
  13. "path/filepath"
  14. "sync"
  15. "time"
  16. "github.com/drakkan/sftpgo/dataprovider"
  17. "github.com/drakkan/sftpgo/httpclient"
  18. "github.com/drakkan/sftpgo/logger"
  19. "github.com/drakkan/sftpgo/metrics"
  20. "github.com/drakkan/sftpgo/utils"
  21. )
  22. const (
  23. logSender = "sftpd"
  24. logSenderSCP = "scp"
  25. logSenderSSH = "ssh"
  26. uploadLogSender = "Upload"
  27. downloadLogSender = "Download"
  28. renameLogSender = "Rename"
  29. rmdirLogSender = "Rmdir"
  30. mkdirLogSender = "Mkdir"
  31. symlinkLogSender = "Symlink"
  32. removeLogSender = "Remove"
  33. chownLogSender = "Chown"
  34. chmodLogSender = "Chmod"
  35. chtimesLogSender = "Chtimes"
  36. sshCommandLogSender = "SSHCommand"
  37. operationDownload = "download"
  38. operationUpload = "upload"
  39. operationDelete = "delete"
  40. operationRename = "rename"
  41. operationSSHCmd = "ssh_cmd"
  42. protocolSFTP = "SFTP"
  43. protocolSCP = "SCP"
  44. protocolSSH = "SSH"
  45. handshakeTimeout = 2 * time.Minute
  46. )
  47. const (
  48. uploadModeStandard = iota
  49. uploadModeAtomic
  50. uploadModeAtomicWithResume
  51. )
  52. var (
  53. mutex sync.RWMutex
  54. openConnections map[string]Connection
  55. activeTransfers []*Transfer
  56. idleTimeout time.Duration
  57. activeQuotaScans []ActiveQuotaScan
  58. dataProvider dataprovider.Provider
  59. actions Actions
  60. uploadMode int
  61. setstatMode int
  62. supportedSSHCommands = []string{"scp", "md5sum", "sha1sum", "sha256sum", "sha384sum", "sha512sum", "cd", "pwd",
  63. "git-receive-pack", "git-upload-pack", "git-upload-archive", "rsync"}
  64. defaultSSHCommands = []string{"md5sum", "sha1sum", "cd", "pwd", "scp"}
  65. sshHashCommands = []string{"md5sum", "sha1sum", "sha256sum", "sha384sum", "sha512sum"}
  66. systemCommands = []string{"git-receive-pack", "git-upload-pack", "git-upload-archive", "rsync"}
  67. )
  68. type connectionTransfer struct {
  69. OperationType string `json:"operation_type"`
  70. StartTime int64 `json:"start_time"`
  71. Size int64 `json:"size"`
  72. LastActivity int64 `json:"last_activity"`
  73. Path string `json:"path"`
  74. }
  75. // ActiveQuotaScan defines an active quota scan
  76. type ActiveQuotaScan struct {
  77. // Username to which the quota scan refers
  78. Username string `json:"username"`
  79. // quota scan start time as unix timestamp in milliseconds
  80. StartTime int64 `json:"start_time"`
  81. }
  82. // Actions to execute on SFTP create, download, delete and rename.
  83. // An external command can be executed and/or an HTTP notification can be fired
  84. type Actions struct {
  85. // Valid values are download, upload, delete, rename, ssh_cmd. Empty slice to disable
  86. ExecuteOn []string `json:"execute_on" mapstructure:"execute_on"`
  87. // Absolute path to the command to execute, empty to disable
  88. Command string `json:"command" mapstructure:"command"`
  89. // The URL to notify using an HTTP GET, empty to disable
  90. HTTPNotificationURL string `json:"http_notification_url" mapstructure:"http_notification_url"`
  91. }
  92. // ConnectionStatus status for an active connection
  93. type ConnectionStatus struct {
  94. // Logged in username
  95. Username string `json:"username"`
  96. // Unique identifier for the connection
  97. ConnectionID string `json:"connection_id"`
  98. // client's version string
  99. ClientVersion string `json:"client_version"`
  100. // Remote address for this connection
  101. RemoteAddress string `json:"remote_address"`
  102. // Connection time as unix timestamp in milliseconds
  103. ConnectionTime int64 `json:"connection_time"`
  104. // Last activity as unix timestamp in milliseconds
  105. LastActivity int64 `json:"last_activity"`
  106. // Protocol for this connection: SFTP, SCP, SSH
  107. Protocol string `json:"protocol"`
  108. // active uploads/downloads
  109. Transfers []connectionTransfer `json:"active_transfers"`
  110. // for protocol SSH this is the issued command
  111. SSHCommand string `json:"ssh_command"`
  112. }
  113. type sshSubsystemExitStatus struct {
  114. Status uint32
  115. }
  116. type sshSubsystemExecMsg struct {
  117. Command string
  118. }
  119. type actionNotification struct {
  120. Action string `json:"action"`
  121. Username string `json:"username"`
  122. Path string `json:"path"`
  123. TargetPath string `json:"target_path,omitempty"`
  124. SSHCmd string `json:"ssh_cmd,omitempty"`
  125. FileSize int64 `json:"file_size,omitempty"`
  126. FsProvider int `json:"fs_provider"`
  127. Bucket string `json:"bucket,omitempty"`
  128. Endpoint string `json:"endpoint,omitempty"`
  129. Status int `json:"status"`
  130. }
  131. func newActionNotification(user dataprovider.User, operation, filePath, target, sshCmd string, fileSize int64,
  132. err error) actionNotification {
  133. bucket := ""
  134. endpoint := ""
  135. status := 1
  136. if user.FsConfig.Provider == 1 {
  137. bucket = user.FsConfig.S3Config.Bucket
  138. endpoint = user.FsConfig.S3Config.Endpoint
  139. } else if user.FsConfig.Provider == 2 {
  140. bucket = user.FsConfig.GCSConfig.Bucket
  141. }
  142. if err != nil {
  143. status = 0
  144. }
  145. return actionNotification{
  146. Action: operation,
  147. Username: user.Username,
  148. Path: filePath,
  149. TargetPath: target,
  150. SSHCmd: sshCmd,
  151. FileSize: fileSize,
  152. FsProvider: user.FsConfig.Provider,
  153. Bucket: bucket,
  154. Endpoint: endpoint,
  155. Status: status,
  156. }
  157. }
  158. func (a *actionNotification) AsJSON() []byte {
  159. res, _ := json.Marshal(a)
  160. return res
  161. }
  162. func (a *actionNotification) AsEnvVars() []string {
  163. return []string{fmt.Sprintf("SFTPGO_ACTION=%v", a.Action),
  164. fmt.Sprintf("SFTPGO_ACTION_USERNAME=%v", a.Username),
  165. fmt.Sprintf("SFTPGO_ACTION_PATH=%v", a.Path),
  166. fmt.Sprintf("SFTPGO_ACTION_TARGET=%v", a.TargetPath),
  167. fmt.Sprintf("SFTPGO_ACTION_SSH_CMD=%v", a.SSHCmd),
  168. fmt.Sprintf("SFTPGO_ACTION_FILE_SIZE=%v", a.FileSize),
  169. fmt.Sprintf("SFTPGO_ACTION_FS_PROVIDER=%v", a.FsProvider),
  170. fmt.Sprintf("SFTPGO_ACTION_BUCKET=%v", a.Bucket),
  171. fmt.Sprintf("SFTPGO_ACTION_ENDPOINT=%v", a.Endpoint),
  172. fmt.Sprintf("SFTPGO_ACTION_STATUS=%v", a.Status),
  173. }
  174. }
  175. func init() {
  176. openConnections = make(map[string]Connection)
  177. }
  178. // GetDefaultSSHCommands returns the SSH commands enabled as default
  179. func GetDefaultSSHCommands() []string {
  180. result := make([]string, len(defaultSSHCommands))
  181. copy(result, defaultSSHCommands)
  182. return result
  183. }
  184. // GetSupportedSSHCommands returns the supported SSH commands
  185. func GetSupportedSSHCommands() []string {
  186. result := make([]string, len(supportedSSHCommands))
  187. copy(result, supportedSSHCommands)
  188. return result
  189. }
  190. // GetConnectionDuration returns the connection duration as string
  191. func (c ConnectionStatus) GetConnectionDuration() string {
  192. elapsed := time.Since(utils.GetTimeFromMsecSinceEpoch(c.ConnectionTime))
  193. return utils.GetDurationAsString(elapsed)
  194. }
  195. // GetConnectionInfo returns connection info.
  196. // Protocol,Client Version and RemoteAddress are returned.
  197. // For SSH commands the issued command is returned too.
  198. func (c ConnectionStatus) GetConnectionInfo() string {
  199. result := fmt.Sprintf("%v. Client: %#v From: %#v", c.Protocol, c.ClientVersion, c.RemoteAddress)
  200. if c.Protocol == protocolSSH && len(c.SSHCommand) > 0 {
  201. result += fmt.Sprintf(". Command: %#v", c.SSHCommand)
  202. }
  203. return result
  204. }
  205. // GetTransfersAsString returns the active transfers as string
  206. func (c ConnectionStatus) GetTransfersAsString() string {
  207. result := ""
  208. for _, t := range c.Transfers {
  209. if len(result) > 0 {
  210. result += ". "
  211. }
  212. result += t.getConnectionTransferAsString()
  213. }
  214. return result
  215. }
  216. func (t connectionTransfer) getConnectionTransferAsString() string {
  217. result := ""
  218. if t.OperationType == operationUpload {
  219. result += "UL"
  220. } else {
  221. result += "DL"
  222. }
  223. result += fmt.Sprintf(" %#v ", t.Path)
  224. if t.Size > 0 {
  225. elapsed := time.Since(utils.GetTimeFromMsecSinceEpoch(t.StartTime))
  226. speed := float64(t.Size) / float64(utils.GetTimeAsMsSinceEpoch(time.Now())-t.StartTime)
  227. result += fmt.Sprintf("Size: %#v Elapsed: %#v Speed: \"%.1f KB/s\"", utils.ByteCountSI(t.Size),
  228. utils.GetDurationAsString(elapsed), speed)
  229. }
  230. return result
  231. }
  232. // SetDataProvider sets the data provider to use to authenticate users and to get/update their disk quota
  233. func SetDataProvider(provider dataprovider.Provider) {
  234. dataProvider = provider
  235. }
  236. func getActiveSessions(username string) int {
  237. mutex.RLock()
  238. defer mutex.RUnlock()
  239. numSessions := 0
  240. for _, c := range openConnections {
  241. if c.User.Username == username {
  242. numSessions++
  243. }
  244. }
  245. return numSessions
  246. }
  247. // GetQuotaScans returns the active quota scans
  248. func GetQuotaScans() []ActiveQuotaScan {
  249. mutex.RLock()
  250. defer mutex.RUnlock()
  251. scans := make([]ActiveQuotaScan, len(activeQuotaScans))
  252. copy(scans, activeQuotaScans)
  253. return scans
  254. }
  255. // AddQuotaScan add a user to the ones with active quota scans.
  256. // Returns false if the user has a quota scan already running
  257. func AddQuotaScan(username string) bool {
  258. mutex.Lock()
  259. defer mutex.Unlock()
  260. for _, s := range activeQuotaScans {
  261. if s.Username == username {
  262. return false
  263. }
  264. }
  265. activeQuotaScans = append(activeQuotaScans, ActiveQuotaScan{
  266. Username: username,
  267. StartTime: utils.GetTimeAsMsSinceEpoch(time.Now()),
  268. })
  269. return true
  270. }
  271. // RemoveQuotaScan removes a user from the ones with active quota scans
  272. func RemoveQuotaScan(username string) error {
  273. mutex.Lock()
  274. defer mutex.Unlock()
  275. var err error
  276. indexToRemove := -1
  277. for i, s := range activeQuotaScans {
  278. if s.Username == username {
  279. indexToRemove = i
  280. break
  281. }
  282. }
  283. if indexToRemove >= 0 {
  284. activeQuotaScans[indexToRemove] = activeQuotaScans[len(activeQuotaScans)-1]
  285. activeQuotaScans = activeQuotaScans[:len(activeQuotaScans)-1]
  286. } else {
  287. logger.Warn(logSender, "", "quota scan to remove not found for user: %v", username)
  288. err = fmt.Errorf("quota scan to remove not found for user: %v", username)
  289. }
  290. return err
  291. }
  292. // CloseActiveConnection closes an active SFTP connection.
  293. // It returns true on success
  294. func CloseActiveConnection(connectionID string) bool {
  295. result := false
  296. mutex.RLock()
  297. defer mutex.RUnlock()
  298. if c, ok := openConnections[connectionID]; ok {
  299. err := c.close()
  300. c.Log(logger.LevelDebug, logSender, "close connection requested, close err: %v", err)
  301. result = true
  302. }
  303. return result
  304. }
  305. // GetConnectionsStats returns stats for active connections
  306. func GetConnectionsStats() []ConnectionStatus {
  307. mutex.RLock()
  308. defer mutex.RUnlock()
  309. stats := []ConnectionStatus{}
  310. for _, c := range openConnections {
  311. conn := ConnectionStatus{
  312. Username: c.User.Username,
  313. ConnectionID: c.ID,
  314. ClientVersion: c.ClientVersion,
  315. RemoteAddress: c.RemoteAddr.String(),
  316. ConnectionTime: utils.GetTimeAsMsSinceEpoch(c.StartTime),
  317. LastActivity: utils.GetTimeAsMsSinceEpoch(c.lastActivity),
  318. Protocol: c.protocol,
  319. Transfers: []connectionTransfer{},
  320. SSHCommand: c.command,
  321. }
  322. for _, t := range activeTransfers {
  323. if t.connectionID == c.ID {
  324. if t.lastActivity.UnixNano() > c.lastActivity.UnixNano() {
  325. conn.LastActivity = utils.GetTimeAsMsSinceEpoch(t.lastActivity)
  326. }
  327. var operationType string
  328. var size int64
  329. if t.transferType == transferUpload {
  330. operationType = operationUpload
  331. size = t.bytesReceived
  332. } else {
  333. operationType = operationDownload
  334. size = t.bytesSent
  335. }
  336. connTransfer := connectionTransfer{
  337. OperationType: operationType,
  338. StartTime: utils.GetTimeAsMsSinceEpoch(t.start),
  339. Size: size,
  340. LastActivity: utils.GetTimeAsMsSinceEpoch(t.lastActivity),
  341. Path: c.fs.GetRelativePath(t.path),
  342. }
  343. conn.Transfers = append(conn.Transfers, connTransfer)
  344. }
  345. }
  346. stats = append(stats, conn)
  347. }
  348. return stats
  349. }
  350. func startIdleTimer(maxIdleTime time.Duration) {
  351. idleTimeout = maxIdleTime
  352. go func() {
  353. for range time.Tick(5 * time.Minute) {
  354. CheckIdleConnections()
  355. }
  356. }()
  357. }
  358. // CheckIdleConnections disconnects clients idle for too long, based on IdleTimeout setting
  359. func CheckIdleConnections() {
  360. mutex.RLock()
  361. defer mutex.RUnlock()
  362. for _, c := range openConnections {
  363. idleTime := time.Since(c.lastActivity)
  364. for _, t := range activeTransfers {
  365. if t.connectionID == c.ID {
  366. transferIdleTime := time.Since(t.lastActivity)
  367. if transferIdleTime < idleTime {
  368. c.Log(logger.LevelDebug, logSender, "idle time: %v setted to transfer idle time: %v",
  369. idleTime, transferIdleTime)
  370. idleTime = transferIdleTime
  371. }
  372. }
  373. }
  374. if idleTime > idleTimeout {
  375. err := c.close()
  376. c.Log(logger.LevelInfo, logSender, "close idle connection, idle time: %v, close error: %v", idleTime, err)
  377. }
  378. }
  379. }
  380. func addConnection(c Connection) {
  381. mutex.Lock()
  382. defer mutex.Unlock()
  383. openConnections[c.ID] = c
  384. metrics.UpdateActiveConnectionsSize(len(openConnections))
  385. c.Log(logger.LevelDebug, logSender, "connection added, num open connections: %v", len(openConnections))
  386. }
  387. func removeConnection(c Connection) {
  388. mutex.Lock()
  389. defer mutex.Unlock()
  390. delete(openConnections, c.ID)
  391. metrics.UpdateActiveConnectionsSize(len(openConnections))
  392. // we have finished to send data here and most of the time the underlying network connection
  393. // is already closed. Sometime a client can still be reading the last sended data, so we set
  394. // a deadline instead of directly closing the network connection.
  395. // Setting a deadline on an already closed connection has no effect.
  396. // We only need to ensure that a connection will not remain indefinitely open and so the
  397. // underlying file descriptor is not released.
  398. // This should protect us against buggy clients and edge cases.
  399. c.netConn.SetDeadline(time.Now().Add(2 * time.Minute)) //nolint:errcheck
  400. c.Log(logger.LevelDebug, logSender, "connection removed, num open connections: %v", len(openConnections))
  401. }
  402. func addTransfer(transfer *Transfer) {
  403. mutex.Lock()
  404. defer mutex.Unlock()
  405. activeTransfers = append(activeTransfers, transfer)
  406. }
  407. func removeTransfer(transfer *Transfer) error {
  408. mutex.Lock()
  409. defer mutex.Unlock()
  410. var err error
  411. indexToRemove := -1
  412. for i, v := range activeTransfers {
  413. if v == transfer {
  414. indexToRemove = i
  415. break
  416. }
  417. }
  418. if indexToRemove >= 0 {
  419. activeTransfers[indexToRemove] = activeTransfers[len(activeTransfers)-1]
  420. activeTransfers = activeTransfers[:len(activeTransfers)-1]
  421. } else {
  422. logger.Warn(logSender, transfer.connectionID, "transfer to remove not found!")
  423. err = fmt.Errorf("transfer to remove not found")
  424. }
  425. return err
  426. }
  427. func updateConnectionActivity(id string) {
  428. mutex.Lock()
  429. defer mutex.Unlock()
  430. if c, ok := openConnections[id]; ok {
  431. c.lastActivity = time.Now()
  432. openConnections[id] = c
  433. }
  434. }
  435. func isAtomicUploadEnabled() bool {
  436. return uploadMode == uploadModeAtomic || uploadMode == uploadModeAtomicWithResume
  437. }
  438. func executeNotificationCommand(a actionNotification) error {
  439. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  440. defer cancel()
  441. cmd := exec.CommandContext(ctx, actions.Command, a.Action, a.Username, a.Path, a.TargetPath, a.SSHCmd)
  442. cmd.Env = append(os.Environ(), a.AsEnvVars()...)
  443. startTime := time.Now()
  444. err := cmd.Run()
  445. logger.Debug(logSender, "", "executed command %#v with arguments: %#v, %#v, %#v, %#v, %#v, elapsed: %v, error: %v",
  446. actions.Command, a.Action, a.Username, a.Path, a.TargetPath, a.SSHCmd, time.Since(startTime), err)
  447. return err
  448. }
  449. // executed in a goroutine
  450. func executeAction(a actionNotification) error {
  451. if !utils.IsStringInSlice(a.Action, actions.ExecuteOn) {
  452. return nil
  453. }
  454. var err error
  455. if len(actions.Command) > 0 && filepath.IsAbs(actions.Command) {
  456. // we are in a goroutine but if we have to send an HTTP notification we don't want to wait for the
  457. // end of the command
  458. if len(actions.HTTPNotificationURL) > 0 {
  459. go executeNotificationCommand(a) //nolint:errcheck
  460. } else {
  461. err = executeNotificationCommand(a) //nolint:errcheck
  462. }
  463. }
  464. if len(actions.HTTPNotificationURL) > 0 {
  465. var url *url.URL
  466. url, err = url.Parse(actions.HTTPNotificationURL)
  467. if err != nil {
  468. logger.Warn(logSender, "", "Invalid http_notification_url %#v for operation %#v: %v", actions.HTTPNotificationURL,
  469. a.Action, err)
  470. return err
  471. }
  472. startTime := time.Now()
  473. httpClient := httpclient.GetHTTPClient()
  474. resp, err := httpClient.Post(url.String(), "application/json", bytes.NewBuffer(a.AsJSON()))
  475. respCode := 0
  476. if err == nil {
  477. respCode = resp.StatusCode
  478. resp.Body.Close()
  479. }
  480. logger.Debug(logSender, "", "notified operation %#v to URL: %v status code: %v, elapsed: %v err: %v",
  481. a.Action, url.String(), respCode, time.Since(startTime), err)
  482. }
  483. return err
  484. }