sftpd.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. // Package sftpd implements the SSH File Transfer Protocol as described in https://tools.ietf.org/html/draft-ietf-secsh-filexfer-02.
  2. // It uses pkg/sftp library:
  3. // https://github.com/pkg/sftp
  4. package sftpd
  5. import (
  6. "fmt"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "os/exec"
  11. "path/filepath"
  12. "sync"
  13. "time"
  14. "github.com/drakkan/sftpgo/dataprovider"
  15. "github.com/drakkan/sftpgo/logger"
  16. "github.com/drakkan/sftpgo/metrics"
  17. "github.com/drakkan/sftpgo/utils"
  18. )
  19. const (
  20. logSender = "sftpd"
  21. logSenderSCP = "scp"
  22. uploadLogSender = "Upload"
  23. downloadLogSender = "Download"
  24. renameLogSender = "Rename"
  25. rmdirLogSender = "Rmdir"
  26. mkdirLogSender = "Mkdir"
  27. symlinkLogSender = "Symlink"
  28. removeLogSender = "Remove"
  29. chownLogSender = "Chown"
  30. chmodLogSender = "Chmod"
  31. operationDownload = "download"
  32. operationUpload = "upload"
  33. operationDelete = "delete"
  34. operationRename = "rename"
  35. protocolSFTP = "SFTP"
  36. protocolSCP = "SCP"
  37. handshakeTimeout = 2 * time.Minute
  38. )
  39. const (
  40. uploadModeStandard = iota
  41. uploadModeAtomic
  42. uploadModeAtomicWithResume
  43. )
  44. var (
  45. mutex sync.RWMutex
  46. openConnections map[string]Connection
  47. activeTransfers []*Transfer
  48. idleConnectionTicker *time.Ticker
  49. idleTimeout time.Duration
  50. activeQuotaScans []ActiveQuotaScan
  51. dataProvider dataprovider.Provider
  52. actions Actions
  53. uploadMode int
  54. setstatMode int
  55. )
  56. type connectionTransfer struct {
  57. OperationType string `json:"operation_type"`
  58. StartTime int64 `json:"start_time"`
  59. Size int64 `json:"size"`
  60. LastActivity int64 `json:"last_activity"`
  61. Path string `json:"path"`
  62. }
  63. // ActiveQuotaScan defines an active quota scan
  64. type ActiveQuotaScan struct {
  65. // Username to which the quota scan refers
  66. Username string `json:"username"`
  67. // quota scan start time as unix timestamp in milliseconds
  68. StartTime int64 `json:"start_time"`
  69. }
  70. // Actions to execute on SFTP create, download, delete and rename.
  71. // An external command can be executed and/or an HTTP notification can be fired
  72. type Actions struct {
  73. // Valid values are download, upload, delete, rename. Empty slice to disable
  74. ExecuteOn []string `json:"execute_on" mapstructure:"execute_on"`
  75. // Absolute path to the command to execute, empty to disable
  76. Command string `json:"command" mapstructure:"command"`
  77. // The URL to notify using an HTTP GET, empty to disable
  78. HTTPNotificationURL string `json:"http_notification_url" mapstructure:"http_notification_url"`
  79. }
  80. // ConnectionStatus status for an active connection
  81. type ConnectionStatus struct {
  82. // Logged in username
  83. Username string `json:"username"`
  84. // Unique identifier for the connection
  85. ConnectionID string `json:"connection_id"`
  86. // client's version string
  87. ClientVersion string `json:"client_version"`
  88. // Remote address for this connection
  89. RemoteAddress string `json:"remote_address"`
  90. // Connection time as unix timestamp in milliseconds
  91. ConnectionTime int64 `json:"connection_time"`
  92. // Last activity as unix timestamp in milliseconds
  93. LastActivity int64 `json:"last_activity"`
  94. // Protocol for this connection: SFTP or SCP
  95. Protocol string `json:"protocol"`
  96. // active uploads/downloads
  97. Transfers []connectionTransfer `json:"active_transfers"`
  98. }
  99. type sshSubsystemExitStatus struct {
  100. Status uint32
  101. }
  102. func init() {
  103. openConnections = make(map[string]Connection)
  104. idleConnectionTicker = time.NewTicker(5 * time.Minute)
  105. }
  106. // GetConnectionDuration returns the connection duration as string
  107. func (c ConnectionStatus) GetConnectionDuration() string {
  108. elapsed := time.Since(utils.GetTimeFromMsecSinceEpoch(c.ConnectionTime))
  109. return utils.GetDurationAsString(elapsed)
  110. }
  111. // GetConnectionInfo returns connection info.
  112. // Protocol,Client Version and RemoteAddress are returned
  113. func (c ConnectionStatus) GetConnectionInfo() string {
  114. return fmt.Sprintf("%v. Client: %#v From: %#v", c.Protocol, c.ClientVersion, c.RemoteAddress)
  115. }
  116. // GetTransfersAsString returns the active transfers as string
  117. func (c ConnectionStatus) GetTransfersAsString() string {
  118. result := ""
  119. for _, t := range c.Transfers {
  120. if len(result) > 0 {
  121. result += ". "
  122. }
  123. result += t.getConnectionTransferAsString()
  124. }
  125. return result
  126. }
  127. func (t connectionTransfer) getConnectionTransferAsString() string {
  128. result := ""
  129. if t.OperationType == operationUpload {
  130. result += "UL"
  131. } else {
  132. result += "DL"
  133. }
  134. result += fmt.Sprintf(" %#v ", t.Path)
  135. if t.Size > 0 {
  136. elapsed := time.Since(utils.GetTimeFromMsecSinceEpoch(t.StartTime))
  137. speed := float64(t.Size) / float64(utils.GetTimeAsMsSinceEpoch(time.Now())-t.StartTime)
  138. result += fmt.Sprintf("Size: %#v Elapsed: %#v Speed: \"%.1f KB/s\"", utils.ByteCountSI(t.Size),
  139. utils.GetDurationAsString(elapsed), speed)
  140. }
  141. return result
  142. }
  143. // SetDataProvider sets the data provider to use to authenticate users and to get/update their disk quota
  144. func SetDataProvider(provider dataprovider.Provider) {
  145. dataProvider = provider
  146. }
  147. func getActiveSessions(username string) int {
  148. mutex.RLock()
  149. defer mutex.RUnlock()
  150. numSessions := 0
  151. for _, c := range openConnections {
  152. if c.User.Username == username {
  153. numSessions++
  154. }
  155. }
  156. return numSessions
  157. }
  158. // GetQuotaScans returns the active quota scans
  159. func GetQuotaScans() []ActiveQuotaScan {
  160. mutex.RLock()
  161. defer mutex.RUnlock()
  162. scans := make([]ActiveQuotaScan, len(activeQuotaScans))
  163. copy(scans, activeQuotaScans)
  164. return scans
  165. }
  166. // AddQuotaScan add an user to the ones with active quota scans.
  167. // Returns false if the user has a quota scan already running
  168. func AddQuotaScan(username string) bool {
  169. mutex.Lock()
  170. defer mutex.Unlock()
  171. for _, s := range activeQuotaScans {
  172. if s.Username == username {
  173. return false
  174. }
  175. }
  176. activeQuotaScans = append(activeQuotaScans, ActiveQuotaScan{
  177. Username: username,
  178. StartTime: utils.GetTimeAsMsSinceEpoch(time.Now()),
  179. })
  180. return true
  181. }
  182. // RemoveQuotaScan removes an user from the ones with active quota scans
  183. func RemoveQuotaScan(username string) error {
  184. mutex.Lock()
  185. defer mutex.Unlock()
  186. var err error
  187. indexToRemove := -1
  188. for i, s := range activeQuotaScans {
  189. if s.Username == username {
  190. indexToRemove = i
  191. break
  192. }
  193. }
  194. if indexToRemove >= 0 {
  195. activeQuotaScans[indexToRemove] = activeQuotaScans[len(activeQuotaScans)-1]
  196. activeQuotaScans = activeQuotaScans[:len(activeQuotaScans)-1]
  197. } else {
  198. logger.Warn(logSender, "", "quota scan to remove not found for user: %v", username)
  199. err = fmt.Errorf("quota scan to remove not found for user: %v", username)
  200. }
  201. return err
  202. }
  203. // CloseActiveConnection closes an active SFTP connection.
  204. // It returns true on success
  205. func CloseActiveConnection(connectionID string) bool {
  206. result := false
  207. mutex.RLock()
  208. defer mutex.RUnlock()
  209. if c, ok := openConnections[connectionID]; ok {
  210. err := c.close()
  211. c.Log(logger.LevelDebug, logSender, "close connection requested, close err: %v", err)
  212. result = true
  213. }
  214. return result
  215. }
  216. // GetConnectionsStats returns stats for active connections
  217. func GetConnectionsStats() []ConnectionStatus {
  218. mutex.RLock()
  219. defer mutex.RUnlock()
  220. stats := []ConnectionStatus{}
  221. for _, c := range openConnections {
  222. conn := ConnectionStatus{
  223. Username: c.User.Username,
  224. ConnectionID: c.ID,
  225. ClientVersion: c.ClientVersion,
  226. RemoteAddress: c.RemoteAddr.String(),
  227. ConnectionTime: utils.GetTimeAsMsSinceEpoch(c.StartTime),
  228. LastActivity: utils.GetTimeAsMsSinceEpoch(c.lastActivity),
  229. Protocol: c.protocol,
  230. Transfers: []connectionTransfer{},
  231. }
  232. for _, t := range activeTransfers {
  233. if t.connectionID == c.ID {
  234. if t.lastActivity.UnixNano() > c.lastActivity.UnixNano() {
  235. conn.LastActivity = utils.GetTimeAsMsSinceEpoch(t.lastActivity)
  236. }
  237. var operationType string
  238. var size int64
  239. if t.transferType == transferUpload {
  240. operationType = operationUpload
  241. size = t.bytesReceived
  242. } else {
  243. operationType = operationDownload
  244. size = t.bytesSent
  245. }
  246. connTransfer := connectionTransfer{
  247. OperationType: operationType,
  248. StartTime: utils.GetTimeAsMsSinceEpoch(t.start),
  249. Size: size,
  250. LastActivity: utils.GetTimeAsMsSinceEpoch(t.lastActivity),
  251. Path: c.User.GetRelativePath(t.path),
  252. }
  253. conn.Transfers = append(conn.Transfers, connTransfer)
  254. }
  255. }
  256. stats = append(stats, conn)
  257. }
  258. return stats
  259. }
  260. func startIdleTimer(maxIdleTime time.Duration) {
  261. idleTimeout = maxIdleTime
  262. go func() {
  263. for t := range idleConnectionTicker.C {
  264. logger.Debug(logSender, "", "idle connections check ticker %v", t)
  265. CheckIdleConnections()
  266. }
  267. }()
  268. }
  269. // CheckIdleConnections disconnects clients idle for too long, based on IdleTimeout setting
  270. func CheckIdleConnections() {
  271. mutex.RLock()
  272. defer mutex.RUnlock()
  273. for _, c := range openConnections {
  274. idleTime := time.Since(c.lastActivity)
  275. for _, t := range activeTransfers {
  276. if t.connectionID == c.ID {
  277. transferIdleTime := time.Since(t.lastActivity)
  278. if transferIdleTime < idleTime {
  279. c.Log(logger.LevelDebug, logSender, "idle time: %v setted to transfer idle time: %v",
  280. idleTime, transferIdleTime)
  281. idleTime = transferIdleTime
  282. }
  283. }
  284. }
  285. if idleTime > idleTimeout {
  286. err := c.close()
  287. c.Log(logger.LevelInfo, logSender, "close idle connection, idle time: %v, close error: %v", idleTime, err)
  288. }
  289. }
  290. logger.Debug(logSender, "", "check idle connections ended")
  291. }
  292. func addConnection(c Connection) {
  293. mutex.Lock()
  294. defer mutex.Unlock()
  295. openConnections[c.ID] = c
  296. metrics.UpdateActiveConnectionsSize(len(openConnections))
  297. c.Log(logger.LevelDebug, logSender, "connection added, num open connections: %v", len(openConnections))
  298. }
  299. func removeConnection(c Connection) {
  300. mutex.Lock()
  301. defer mutex.Unlock()
  302. delete(openConnections, c.ID)
  303. metrics.UpdateActiveConnectionsSize(len(openConnections))
  304. // we have finished to send data here and most of the time the underlying network connection
  305. // is already closed. Sometime a client can still be reading the last sended data, so we set
  306. // a deadline instead of directly closing the network connection.
  307. // Setting a deadline on an already closed connection has no effect.
  308. // We only need to ensure that a connection will not remain indefinitely open and so the
  309. // underlying file descriptor is not released.
  310. // This should protect us against buggy clients and edge cases.
  311. c.netConn.SetDeadline(time.Now().Add(2 * time.Minute))
  312. c.Log(logger.LevelDebug, logSender, "connection removed, num open connections: %v", len(openConnections))
  313. }
  314. func addTransfer(transfer *Transfer) {
  315. mutex.Lock()
  316. defer mutex.Unlock()
  317. activeTransfers = append(activeTransfers, transfer)
  318. }
  319. func removeTransfer(transfer *Transfer) error {
  320. mutex.Lock()
  321. defer mutex.Unlock()
  322. var err error
  323. indexToRemove := -1
  324. for i, v := range activeTransfers {
  325. if v == transfer {
  326. indexToRemove = i
  327. break
  328. }
  329. }
  330. if indexToRemove >= 0 {
  331. activeTransfers[indexToRemove] = activeTransfers[len(activeTransfers)-1]
  332. activeTransfers = activeTransfers[:len(activeTransfers)-1]
  333. } else {
  334. logger.Warn(logSender, transfer.connectionID, "transfer to remove not found!")
  335. err = fmt.Errorf("transfer to remove not found")
  336. }
  337. return err
  338. }
  339. func updateConnectionActivity(id string) {
  340. mutex.Lock()
  341. defer mutex.Unlock()
  342. if c, ok := openConnections[id]; ok {
  343. c.lastActivity = time.Now()
  344. openConnections[id] = c
  345. }
  346. }
  347. func isAtomicUploadEnabled() bool {
  348. return uploadMode == uploadModeAtomic || uploadMode == uploadModeAtomicWithResume
  349. }
  350. // executed in a goroutine
  351. func executeAction(operation string, username string, path string, target string) error {
  352. if !utils.IsStringInSlice(operation, actions.ExecuteOn) {
  353. return nil
  354. }
  355. var err error
  356. if len(actions.Command) > 0 && filepath.IsAbs(actions.Command) {
  357. if _, err = os.Stat(actions.Command); err == nil {
  358. command := exec.Command(actions.Command, operation, username, path, target)
  359. err = command.Start()
  360. logger.Debug(logSender, "", "start command %#v with arguments: %v, %v, %v, %v, error: %v",
  361. actions.Command, operation, username, path, target, err)
  362. if err == nil {
  363. // we are in a goroutine but we don't want to block here, this way we can send the
  364. // HTTP notification, if configured, without waiting the end of the command
  365. go command.Wait()
  366. }
  367. } else {
  368. logger.Warn(logSender, "", "Invalid action command %#v for operation %#v: %v", actions.Command, operation, err)
  369. }
  370. }
  371. if len(actions.HTTPNotificationURL) > 0 {
  372. var url *url.URL
  373. url, err = url.Parse(actions.HTTPNotificationURL)
  374. if err == nil {
  375. q := url.Query()
  376. q.Add("action", operation)
  377. q.Add("username", username)
  378. q.Add("path", path)
  379. if len(target) > 0 {
  380. q.Add("target_path", target)
  381. }
  382. url.RawQuery = q.Encode()
  383. startTime := time.Now()
  384. httpClient := &http.Client{
  385. Timeout: 15 * time.Second,
  386. }
  387. resp, err := httpClient.Get(url.String())
  388. respCode := 0
  389. if err == nil {
  390. respCode = resp.StatusCode
  391. resp.Body.Close()
  392. }
  393. logger.Debug(logSender, "", "notified operation %#v to URL: %v status code: %v, elapsed: %v err: %v",
  394. operation, url.String(), respCode, time.Since(startTime), err)
  395. } else {
  396. logger.Warn(logSender, "", "Invalid http_notification_url %#v for operation %#v: %v", actions.HTTPNotificationURL,
  397. operation, err)
  398. }
  399. }
  400. return err
  401. }