sftpd.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. // Package sftpd implements the SSH File Transfer Protocol as described in https://tools.ietf.org/html/draft-ietf-secsh-filexfer-02.
  2. // It uses pkg/sftp library:
  3. // https://github.com/pkg/sftp
  4. package sftpd
  5. import (
  6. "fmt"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "os/exec"
  11. "path/filepath"
  12. "sync"
  13. "time"
  14. "github.com/drakkan/sftpgo/dataprovider"
  15. "github.com/drakkan/sftpgo/logger"
  16. "github.com/drakkan/sftpgo/utils"
  17. )
  18. const (
  19. logSender = "sftpd"
  20. sftpUploadLogSender = "SFTPUpload"
  21. sftpdDownloadLogSender = "SFTPDownload"
  22. sftpdRenameLogSender = "SFTPRename"
  23. sftpdRmdirLogSender = "SFTPRmdir"
  24. sftpdMkdirLogSender = "SFTPMkdir"
  25. sftpdSymlinkLogSender = "SFTPSymlink"
  26. sftpdRemoveLogSender = "SFTPRemove"
  27. operationDownload = "download"
  28. operationUpload = "upload"
  29. operationDelete = "delete"
  30. operationRename = "rename"
  31. )
  32. var (
  33. mutex sync.RWMutex
  34. openConnections map[string]Connection
  35. activeTransfers []*Transfer
  36. idleConnectionTicker *time.Ticker
  37. idleTimeout time.Duration
  38. activeQuotaScans []ActiveQuotaScan
  39. dataProvider dataprovider.Provider
  40. actions Actions
  41. uploadMode int
  42. )
  43. type connectionTransfer struct {
  44. OperationType string `json:"operation_type"`
  45. StartTime int64 `json:"start_time"`
  46. Size int64 `json:"size"`
  47. LastActivity int64 `json:"last_activity"`
  48. Path string `json:"path"`
  49. }
  50. // ActiveQuotaScan defines an active quota scan
  51. type ActiveQuotaScan struct {
  52. // Username to which the quota scan refers
  53. Username string `json:"username"`
  54. // quota scan start time as unix timestamp in milliseconds
  55. StartTime int64 `json:"start_time"`
  56. }
  57. // Actions to execute on SFTP create, download, delete and rename.
  58. // An external command can be executed and/or an HTTP notification can be fired
  59. type Actions struct {
  60. // Valid values are download, upload, delete, rename. Empty slice to disable
  61. ExecuteOn []string `json:"execute_on" mapstructure:"execute_on"`
  62. // Absolute path to the command to execute, empty to disable
  63. Command string `json:"command" mapstructure:"command"`
  64. // The URL to notify using an HTTP GET, empty to disable
  65. HTTPNotificationURL string `json:"http_notification_url" mapstructure:"http_notification_url"`
  66. }
  67. // ConnectionStatus status for an active connection
  68. type ConnectionStatus struct {
  69. // Logged in username
  70. Username string `json:"username"`
  71. // Unique identifier for the connection
  72. ConnectionID string `json:"connection_id"`
  73. // client's version string
  74. ClientVersion string `json:"client_version"`
  75. // Remote address for this connection
  76. RemoteAddress string `json:"remote_address"`
  77. // Connection time as unix timestamp in milliseconds
  78. ConnectionTime int64 `json:"connection_time"`
  79. // Last activity as unix timestamp in milliseconds
  80. LastActivity int64 `json:"last_activity"`
  81. // active uploads/downloads
  82. Transfers []connectionTransfer `json:"active_transfers"`
  83. }
  84. func init() {
  85. openConnections = make(map[string]Connection)
  86. idleConnectionTicker = time.NewTicker(5 * time.Minute)
  87. }
  88. // SetDataProvider sets the data provider to use to authenticate users and to get/update their disk quota
  89. func SetDataProvider(provider dataprovider.Provider) {
  90. dataProvider = provider
  91. }
  92. func getActiveSessions(username string) int {
  93. mutex.RLock()
  94. defer mutex.RUnlock()
  95. numSessions := 0
  96. for _, c := range openConnections {
  97. if c.User.Username == username {
  98. numSessions++
  99. }
  100. }
  101. return numSessions
  102. }
  103. // GetQuotaScans returns the active quota scans
  104. func GetQuotaScans() []ActiveQuotaScan {
  105. mutex.RLock()
  106. defer mutex.RUnlock()
  107. scans := make([]ActiveQuotaScan, len(activeQuotaScans))
  108. copy(scans, activeQuotaScans)
  109. return scans
  110. }
  111. // AddQuotaScan add an user to the ones with active quota scans.
  112. // Returns false if the user has a quota scan already running
  113. func AddQuotaScan(username string) bool {
  114. mutex.Lock()
  115. defer mutex.Unlock()
  116. for _, s := range activeQuotaScans {
  117. if s.Username == username {
  118. return false
  119. }
  120. }
  121. activeQuotaScans = append(activeQuotaScans, ActiveQuotaScan{
  122. Username: username,
  123. StartTime: utils.GetTimeAsMsSinceEpoch(time.Now()),
  124. })
  125. return true
  126. }
  127. // RemoveQuotaScan removes an user from the ones with active quota scans
  128. func RemoveQuotaScan(username string) error {
  129. mutex.Lock()
  130. defer mutex.Unlock()
  131. var err error
  132. indexToRemove := -1
  133. for i, s := range activeQuotaScans {
  134. if s.Username == username {
  135. indexToRemove = i
  136. break
  137. }
  138. }
  139. if indexToRemove >= 0 {
  140. activeQuotaScans[indexToRemove] = activeQuotaScans[len(activeQuotaScans)-1]
  141. activeQuotaScans = activeQuotaScans[:len(activeQuotaScans)-1]
  142. } else {
  143. logger.Warn(logSender, "quota scan to remove not found for user: %v", username)
  144. err = fmt.Errorf("quota scan to remove not found for user: %v", username)
  145. }
  146. return err
  147. }
  148. // CloseActiveConnection closes an active SFTP connection.
  149. // It returns true on success
  150. func CloseActiveConnection(connectionID string) bool {
  151. result := false
  152. mutex.RLock()
  153. defer mutex.RUnlock()
  154. for _, c := range openConnections {
  155. if c.ID == connectionID {
  156. logger.Debug(logSender, "closing connection with id: %v", connectionID)
  157. c.sshConn.Close()
  158. result = true
  159. break
  160. }
  161. }
  162. return result
  163. }
  164. // GetConnectionsStats returns stats for active connections
  165. func GetConnectionsStats() []ConnectionStatus {
  166. mutex.RLock()
  167. defer mutex.RUnlock()
  168. stats := []ConnectionStatus{}
  169. for _, c := range openConnections {
  170. conn := ConnectionStatus{
  171. Username: c.User.Username,
  172. ConnectionID: c.ID,
  173. ClientVersion: c.ClientVersion,
  174. RemoteAddress: c.RemoteAddr.String(),
  175. ConnectionTime: utils.GetTimeAsMsSinceEpoch(c.StartTime),
  176. LastActivity: utils.GetTimeAsMsSinceEpoch(c.lastActivity),
  177. Transfers: []connectionTransfer{},
  178. }
  179. for _, t := range activeTransfers {
  180. if t.connectionID == c.ID {
  181. if t.lastActivity.UnixNano() > c.lastActivity.UnixNano() {
  182. conn.LastActivity = utils.GetTimeAsMsSinceEpoch(t.lastActivity)
  183. }
  184. var operationType string
  185. var size int64
  186. if t.transferType == transferUpload {
  187. operationType = operationUpload
  188. size = t.bytesReceived
  189. } else {
  190. operationType = operationDownload
  191. size = t.bytesSent
  192. }
  193. connTransfer := connectionTransfer{
  194. OperationType: operationType,
  195. StartTime: utils.GetTimeAsMsSinceEpoch(t.start),
  196. Size: size,
  197. LastActivity: utils.GetTimeAsMsSinceEpoch(t.lastActivity),
  198. Path: c.User.GetRelativePath(t.path),
  199. }
  200. conn.Transfers = append(conn.Transfers, connTransfer)
  201. }
  202. }
  203. stats = append(stats, conn)
  204. }
  205. return stats
  206. }
  207. func startIdleTimer(maxIdleTime time.Duration) {
  208. idleTimeout = maxIdleTime
  209. go func() {
  210. for t := range idleConnectionTicker.C {
  211. logger.Debug(logSender, "idle connections check ticker %v", t)
  212. CheckIdleConnections()
  213. }
  214. }()
  215. }
  216. // CheckIdleConnections disconnects clients idle for too long, based on IdleTimeout setting
  217. func CheckIdleConnections() {
  218. mutex.RLock()
  219. defer mutex.RUnlock()
  220. for _, c := range openConnections {
  221. idleTime := time.Since(c.lastActivity)
  222. for _, t := range activeTransfers {
  223. if t.connectionID == c.ID {
  224. transferIdleTime := time.Since(t.lastActivity)
  225. if transferIdleTime < idleTime {
  226. logger.Debug(logSender, "idle time: %v setted to transfer idle time: %v connection id: %v",
  227. idleTime, transferIdleTime, c.ID)
  228. idleTime = transferIdleTime
  229. }
  230. }
  231. }
  232. if idleTime > idleTimeout {
  233. logger.Debug(logSender, "close idle connection id: %v idle time: %v", c.ID, idleTime)
  234. err := c.sshConn.Close()
  235. if err != nil {
  236. logger.Warn(logSender, "error closing idle connection: %v", err)
  237. }
  238. }
  239. }
  240. logger.Debug(logSender, "check idle connections ended")
  241. }
  242. func addConnection(id string, conn Connection) {
  243. mutex.Lock()
  244. defer mutex.Unlock()
  245. openConnections[id] = conn
  246. logger.Debug(logSender, "connection added, num open connections: %v", len(openConnections))
  247. }
  248. func removeConnection(id string) {
  249. mutex.Lock()
  250. defer mutex.Unlock()
  251. delete(openConnections, id)
  252. logger.Debug(logSender, "connection removed, num open connections: %v", len(openConnections))
  253. }
  254. func addTransfer(transfer *Transfer) {
  255. mutex.Lock()
  256. defer mutex.Unlock()
  257. activeTransfers = append(activeTransfers, transfer)
  258. }
  259. func removeTransfer(transfer *Transfer) error {
  260. mutex.Lock()
  261. defer mutex.Unlock()
  262. var err error
  263. indexToRemove := -1
  264. for i, v := range activeTransfers {
  265. if v == transfer {
  266. indexToRemove = i
  267. break
  268. }
  269. }
  270. if indexToRemove >= 0 {
  271. activeTransfers[indexToRemove] = activeTransfers[len(activeTransfers)-1]
  272. activeTransfers = activeTransfers[:len(activeTransfers)-1]
  273. } else {
  274. logger.Warn(logSender, "transfer to remove not found!")
  275. err = fmt.Errorf("transfer to remove not found")
  276. }
  277. return err
  278. }
  279. func updateConnectionActivity(id string) {
  280. mutex.Lock()
  281. defer mutex.Unlock()
  282. if c, ok := openConnections[id]; ok {
  283. c.lastActivity = time.Now()
  284. openConnections[id] = c
  285. }
  286. }
  287. func executeAction(operation string, username string, path string, target string) error {
  288. if !utils.IsStringInSlice(operation, actions.ExecuteOn) {
  289. return nil
  290. }
  291. var err error
  292. if len(actions.Command) > 0 && filepath.IsAbs(actions.Command) {
  293. if _, err = os.Stat(actions.Command); err == nil {
  294. command := exec.Command(actions.Command, operation, username, path, target)
  295. err = command.Start()
  296. logger.Debug(logSender, "executed command \"%v\" with arguments: %v, %v, %v, %v, error: %v",
  297. actions.Command, operation, username, path, target, err)
  298. } else {
  299. logger.Warn(logSender, "Invalid action command \"%v\" : %v", actions.Command, err)
  300. }
  301. }
  302. if len(actions.HTTPNotificationURL) > 0 {
  303. var url *url.URL
  304. url, err = url.Parse(actions.HTTPNotificationURL)
  305. if err == nil {
  306. q := url.Query()
  307. q.Add("action", operation)
  308. q.Add("username", username)
  309. q.Add("path", path)
  310. if len(target) > 0 {
  311. q.Add("target_path", target)
  312. }
  313. url.RawQuery = q.Encode()
  314. go func() {
  315. startTime := time.Now()
  316. httpClient := &http.Client{
  317. Timeout: 15 * time.Second,
  318. }
  319. resp, err := httpClient.Get(url.String())
  320. respCode := 0
  321. if err == nil {
  322. respCode = resp.StatusCode
  323. resp.Body.Close()
  324. }
  325. logger.Debug(logSender, "notified action to URL: %v status code: %v, elapsed: %v err: %v",
  326. url.String(), respCode, time.Since(startTime), err)
  327. }()
  328. } else {
  329. logger.Warn(logSender, "Invalid http_notification_url \"%v\" : %v", actions.HTTPNotificationURL, err)
  330. }
  331. }
  332. return err
  333. }