sftpd.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. package sftpd
  2. import (
  3. "fmt"
  4. "net/http"
  5. "net/url"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "sync"
  10. "time"
  11. "github.com/drakkan/sftpgo/dataprovider"
  12. "github.com/drakkan/sftpgo/logger"
  13. "github.com/drakkan/sftpgo/utils"
  14. )
  15. const (
  16. logSender = "sftpd"
  17. sftpUploadLogSender = "SFTPUpload"
  18. sftpdDownloadLogSender = "SFTPDownload"
  19. sftpdRenameLogSender = "SFTPRename"
  20. sftpdRmdirLogSender = "SFTPRmdir"
  21. sftpdMkdirLogSender = "SFTPMkdir"
  22. sftpdSymlinkLogSender = "SFTPSymlink"
  23. sftpdRemoveLogSender = "SFTPRemove"
  24. operationDownload = "download"
  25. operationUpload = "upload"
  26. operationDelete = "delete"
  27. operationRename = "rename"
  28. )
  29. var (
  30. mutex sync.RWMutex
  31. openConnections map[string]Connection
  32. activeTransfers []*Transfer
  33. idleConnectionTicker *time.Ticker
  34. idleTimeout time.Duration
  35. activeQuotaScans []ActiveQuotaScan
  36. dataProvider dataprovider.Provider
  37. actions Actions
  38. )
  39. type connectionTransfer struct {
  40. OperationType string `json:"operation_type"`
  41. StartTime int64 `json:"start_time"`
  42. Size int64 `json:"size"`
  43. LastActivity int64 `json:"last_activity"`
  44. }
  45. // ActiveQuotaScan username and start data for a quota scan
  46. type ActiveQuotaScan struct {
  47. Username string `json:"username"`
  48. StartTime int64 `json:"start_time"`
  49. }
  50. // Actions configuration for external script to execute on create, download, delete.
  51. // A rename trigger delete script for the old file and create script for the new one
  52. type Actions struct {
  53. ExecuteOn []string `json:"execute_on"`
  54. Command string `json:"command"`
  55. HTTPNotificationURL string `json:"http_notification_url"`
  56. }
  57. // ConnectionStatus status for an active connection
  58. type ConnectionStatus struct {
  59. Username string `json:"username"`
  60. ConnectionID string `json:"connection_id"`
  61. ClientVersion string `json:"client_version"`
  62. RemoteAddress string `json:"remote_address"`
  63. ConnectionTime int64 `json:"connection_time"`
  64. LastActivity int64 `json:"last_activity"`
  65. Transfers []connectionTransfer `json:"active_transfers"`
  66. }
  67. func init() {
  68. openConnections = make(map[string]Connection)
  69. idleConnectionTicker = time.NewTicker(5 * time.Minute)
  70. }
  71. // SetDataProvider sets the data provider
  72. func SetDataProvider(provider dataprovider.Provider) {
  73. dataProvider = provider
  74. }
  75. func getActiveSessions(username string) int {
  76. mutex.RLock()
  77. defer mutex.RUnlock()
  78. numSessions := 0
  79. for _, c := range openConnections {
  80. if c.User.Username == username {
  81. numSessions++
  82. }
  83. }
  84. return numSessions
  85. }
  86. // GetQuotaScans returns the active quota scans
  87. func GetQuotaScans() []ActiveQuotaScan {
  88. mutex.RLock()
  89. defer mutex.RUnlock()
  90. scans := make([]ActiveQuotaScan, len(activeQuotaScans))
  91. copy(scans, activeQuotaScans)
  92. return scans
  93. }
  94. // AddQuotaScan add an user to the ones with active quota scans.
  95. // Returns false if the user has a quota scan already running
  96. func AddQuotaScan(username string) bool {
  97. mutex.Lock()
  98. defer mutex.Unlock()
  99. for _, s := range activeQuotaScans {
  100. if s.Username == username {
  101. return false
  102. }
  103. }
  104. activeQuotaScans = append(activeQuotaScans, ActiveQuotaScan{
  105. Username: username,
  106. StartTime: utils.GetTimeAsMsSinceEpoch(time.Now()),
  107. })
  108. return true
  109. }
  110. // RemoveQuotaScan remove and user from the ones with active quota scans
  111. func RemoveQuotaScan(username string) error {
  112. mutex.Lock()
  113. defer mutex.Unlock()
  114. var err error
  115. indexToRemove := -1
  116. for i, s := range activeQuotaScans {
  117. if s.Username == username {
  118. indexToRemove = i
  119. break
  120. }
  121. }
  122. if indexToRemove >= 0 {
  123. activeQuotaScans[indexToRemove] = activeQuotaScans[len(activeQuotaScans)-1]
  124. activeQuotaScans = activeQuotaScans[:len(activeQuotaScans)-1]
  125. } else {
  126. logger.Warn(logSender, "quota scan to remove not found for user: %v", username)
  127. err = fmt.Errorf("quota scan to remove not found for user: %v", username)
  128. }
  129. return err
  130. }
  131. // CloseActiveConnection close an active SFTP connection, returns true on success
  132. func CloseActiveConnection(connectionID string) bool {
  133. result := false
  134. mutex.RLock()
  135. defer mutex.RUnlock()
  136. for _, c := range openConnections {
  137. if c.ID == connectionID {
  138. logger.Debug(logSender, "closing connection with id: %v", connectionID)
  139. c.sshConn.Close()
  140. result = true
  141. break
  142. }
  143. }
  144. return result
  145. }
  146. // GetConnectionsStats returns stats for active connections
  147. func GetConnectionsStats() []ConnectionStatus {
  148. mutex.RLock()
  149. defer mutex.RUnlock()
  150. stats := []ConnectionStatus{}
  151. for _, c := range openConnections {
  152. conn := ConnectionStatus{
  153. Username: c.User.Username,
  154. ConnectionID: c.ID,
  155. ClientVersion: c.ClientVersion,
  156. RemoteAddress: c.RemoteAddr.String(),
  157. ConnectionTime: utils.GetTimeAsMsSinceEpoch(c.StartTime),
  158. LastActivity: utils.GetTimeAsMsSinceEpoch(c.lastActivity),
  159. Transfers: []connectionTransfer{},
  160. }
  161. for _, t := range activeTransfers {
  162. if t.connectionID == c.ID {
  163. if utils.GetTimeAsMsSinceEpoch(t.lastActivity) > conn.LastActivity {
  164. conn.LastActivity = utils.GetTimeAsMsSinceEpoch(t.lastActivity)
  165. }
  166. var operationType string
  167. var size int64
  168. if t.transferType == transferUpload {
  169. operationType = operationUpload
  170. size = t.bytesReceived
  171. } else {
  172. operationType = operationDownload
  173. size = t.bytesSent
  174. }
  175. connTransfer := connectionTransfer{
  176. OperationType: operationType,
  177. StartTime: utils.GetTimeAsMsSinceEpoch(t.start),
  178. Size: size,
  179. LastActivity: utils.GetTimeAsMsSinceEpoch(t.lastActivity),
  180. }
  181. conn.Transfers = append(conn.Transfers, connTransfer)
  182. }
  183. }
  184. stats = append(stats, conn)
  185. }
  186. return stats
  187. }
  188. func startIdleTimer(maxIdleTime time.Duration) {
  189. idleTimeout = maxIdleTime
  190. go func() {
  191. for t := range idleConnectionTicker.C {
  192. logger.Debug(logSender, "idle connections check ticker %v", t)
  193. CheckIdleConnections()
  194. }
  195. }()
  196. }
  197. // CheckIdleConnections disconnects idle clients
  198. func CheckIdleConnections() {
  199. mutex.RLock()
  200. defer mutex.RUnlock()
  201. for _, c := range openConnections {
  202. idleTime := time.Since(c.lastActivity)
  203. for _, t := range activeTransfers {
  204. if t.connectionID == c.ID {
  205. transferIdleTime := time.Since(t.lastActivity)
  206. if transferIdleTime < idleTime {
  207. logger.Debug(logSender, "idle time: %v setted to transfer idle time: %v connection id: %v",
  208. idleTime, transferIdleTime, c.ID)
  209. idleTime = transferIdleTime
  210. }
  211. }
  212. }
  213. if idleTime > idleTimeout {
  214. logger.Debug(logSender, "close idle connection id: %v idle time: %v", c.ID, idleTime)
  215. err := c.sshConn.Close()
  216. if err != nil {
  217. logger.Warn(logSender, "error closing idle connection: %v", err)
  218. }
  219. }
  220. }
  221. logger.Debug(logSender, "check idle connections ended")
  222. }
  223. func addConnection(id string, conn Connection) {
  224. mutex.Lock()
  225. defer mutex.Unlock()
  226. openConnections[id] = conn
  227. logger.Debug(logSender, "connection added, num open connections: %v", len(openConnections))
  228. }
  229. func removeConnection(id string) {
  230. mutex.Lock()
  231. defer mutex.Unlock()
  232. delete(openConnections, id)
  233. logger.Debug(logSender, "connection removed, num open connections: %v", len(openConnections))
  234. }
  235. func addTransfer(transfer *Transfer) {
  236. mutex.Lock()
  237. defer mutex.Unlock()
  238. activeTransfers = append(activeTransfers, transfer)
  239. }
  240. func removeTransfer(transfer *Transfer) error {
  241. mutex.Lock()
  242. defer mutex.Unlock()
  243. var err error
  244. indexToRemove := -1
  245. for i, v := range activeTransfers {
  246. if v == transfer {
  247. indexToRemove = i
  248. break
  249. }
  250. }
  251. if indexToRemove >= 0 {
  252. activeTransfers[indexToRemove] = activeTransfers[len(activeTransfers)-1]
  253. activeTransfers = activeTransfers[:len(activeTransfers)-1]
  254. } else {
  255. logger.Warn(logSender, "transfer to remove not found!")
  256. err = fmt.Errorf("transfer to remove not found")
  257. }
  258. return err
  259. }
  260. func updateConnectionActivity(id string) {
  261. mutex.Lock()
  262. defer mutex.Unlock()
  263. if c, ok := openConnections[id]; ok {
  264. c.lastActivity = time.Now()
  265. openConnections[id] = c
  266. }
  267. }
  268. func executeAction(operation string, username string, path string, target string) error {
  269. if !utils.IsStringInSlice(operation, actions.ExecuteOn) {
  270. return nil
  271. }
  272. var err error
  273. if len(actions.Command) > 0 && filepath.IsAbs(actions.Command) {
  274. if _, err = os.Stat(actions.Command); err == nil {
  275. command := exec.Command(actions.Command, operation, username, path, target)
  276. err = command.Start()
  277. logger.Debug(logSender, "executed command \"%v\" with arguments: %v, %v, %v, %v, error: %v",
  278. actions.Command, operation, username, path, target, err)
  279. } else {
  280. logger.Warn(logSender, "Invalid action command \"%v\" : %v", actions.Command, err)
  281. }
  282. }
  283. if len(actions.HTTPNotificationURL) > 0 {
  284. var url *url.URL
  285. url, err = url.Parse(actions.HTTPNotificationURL)
  286. if err == nil {
  287. q := url.Query()
  288. q.Add("action", operation)
  289. q.Add("username", username)
  290. q.Add("path", path)
  291. if len(target) > 0 {
  292. q.Add("target_path", target)
  293. }
  294. url.RawQuery = q.Encode()
  295. go func() {
  296. startTime := time.Now()
  297. httpClient := &http.Client{
  298. Timeout: 15 * time.Second,
  299. }
  300. resp, err := httpClient.Get(url.String())
  301. respCode := 0
  302. if err == nil {
  303. respCode = resp.StatusCode
  304. resp.Body.Close()
  305. }
  306. logger.Debug(logSender, "notified action to URL: %v status code: %v, elapsed: %v err: %v",
  307. url.String(), respCode, time.Since(startTime), err)
  308. }()
  309. } else {
  310. logger.Warn(logSender, "Invalid http_notification_url \"%v\" : %v", actions.HTTPNotificationURL, err)
  311. }
  312. }
  313. return err
  314. }