sftpd.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package sftpd
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/drakkan/sftpgo/dataprovider"
  6. "github.com/drakkan/sftpgo/logger"
  7. "github.com/drakkan/sftpgo/utils"
  8. )
  9. const (
  10. logSender = "sftpd"
  11. sftpUploadLogSender = "SFTPUpload"
  12. sftpdDownloadLogSender = "SFTPDownload"
  13. sftpdRenameLogSender = "SFTPRename"
  14. sftpdRmdirLogSender = "SFTPRmdir"
  15. sftpdMkdirLogSender = "SFTPMkdir"
  16. sftpdSymlinkLogSender = "SFTPSymlink"
  17. sftpdRemoveLogSender = "SFTPRemove"
  18. operationDownload = "download"
  19. operationUpload = "upload"
  20. )
  21. var (
  22. mutex sync.RWMutex
  23. openConnections map[string]Connection
  24. activeTransfers []*Transfer
  25. idleConnectionTicker *time.Ticker
  26. idleTimeout time.Duration
  27. activeQuotaScans []ActiveQuotaScan
  28. dataProvider dataprovider.Provider
  29. )
  30. type connectionTransfer struct {
  31. OperationType string `json:"operation_type"`
  32. StartTime int64 `json:"start_time"`
  33. Size int64 `json:"size"`
  34. LastActivity int64 `json:"last_activity"`
  35. }
  36. // ActiveQuotaScan username and start data for a quota scan
  37. type ActiveQuotaScan struct {
  38. Username string `json:"username"`
  39. StartTime int64 `json:"start_time"`
  40. }
  41. // ConnectionStatus status for an active connection
  42. type ConnectionStatus struct {
  43. Username string `json:"username"`
  44. ConnectionID string `json:"connection_id"`
  45. ClientVersion string `json:"client_version"`
  46. RemoteAddress string `json:"remote_address"`
  47. ConnectionTime int64 `json:"connection_time"`
  48. LastActivity int64 `json:"last_activity"`
  49. Transfers []connectionTransfer `json:"active_transfers"`
  50. }
  51. func init() {
  52. openConnections = make(map[string]Connection)
  53. }
  54. // SetDataProvider sets the data provider
  55. func SetDataProvider(provider dataprovider.Provider) {
  56. dataProvider = provider
  57. }
  58. func getActiveSessions(username string) int {
  59. mutex.RLock()
  60. defer mutex.RUnlock()
  61. numSessions := 0
  62. for _, c := range openConnections {
  63. if c.User.Username == username {
  64. numSessions++
  65. }
  66. }
  67. return numSessions
  68. }
  69. // GetQuotaScans returns the active quota scans
  70. func GetQuotaScans() []ActiveQuotaScan {
  71. mutex.RLock()
  72. defer mutex.RUnlock()
  73. scans := make([]ActiveQuotaScan, len(activeQuotaScans))
  74. copy(scans, activeQuotaScans)
  75. return scans
  76. }
  77. // AddQuotaScan add an user to the ones with active quota scans.
  78. // Returns false if the user has a quota scan already running
  79. func AddQuotaScan(username string) bool {
  80. mutex.Lock()
  81. defer mutex.Unlock()
  82. for _, s := range activeQuotaScans {
  83. if s.Username == username {
  84. return false
  85. }
  86. }
  87. activeQuotaScans = append(activeQuotaScans, ActiveQuotaScan{
  88. Username: username,
  89. StartTime: utils.GetTimeAsMsSinceEpoch(time.Now()),
  90. })
  91. return true
  92. }
  93. // RemoveQuotaScan remove and user from the ones with active quota scans
  94. func RemoveQuotaScan(username string) {
  95. mutex.Lock()
  96. defer mutex.Unlock()
  97. indexToRemove := -1
  98. for i, s := range activeQuotaScans {
  99. if s.Username == username {
  100. indexToRemove = i
  101. break
  102. }
  103. }
  104. if indexToRemove >= 0 {
  105. activeQuotaScans[indexToRemove] = activeQuotaScans[len(activeQuotaScans)-1]
  106. activeQuotaScans = activeQuotaScans[:len(activeQuotaScans)-1]
  107. }
  108. }
  109. // CloseActiveConnection close an active SFTP connection, returns true on success
  110. func CloseActiveConnection(connectionID string) bool {
  111. result := false
  112. mutex.RLock()
  113. defer mutex.RUnlock()
  114. for _, c := range openConnections {
  115. if c.ID == connectionID {
  116. logger.Debug(logSender, "closing connection with id: %v", connectionID)
  117. c.sshConn.Close()
  118. result = true
  119. break
  120. }
  121. }
  122. return result
  123. }
  124. // GetConnectionsStats returns stats for active connections
  125. func GetConnectionsStats() []ConnectionStatus {
  126. mutex.RLock()
  127. defer mutex.RUnlock()
  128. stats := []ConnectionStatus{}
  129. for _, c := range openConnections {
  130. conn := ConnectionStatus{
  131. Username: c.User.Username,
  132. ConnectionID: c.ID,
  133. ClientVersion: c.ClientVersion,
  134. RemoteAddress: c.RemoteAddr.String(),
  135. ConnectionTime: utils.GetTimeAsMsSinceEpoch(c.StartTime),
  136. LastActivity: utils.GetTimeAsMsSinceEpoch(c.lastActivity),
  137. Transfers: []connectionTransfer{},
  138. }
  139. for _, t := range activeTransfers {
  140. if t.connectionID == c.ID {
  141. if utils.GetTimeAsMsSinceEpoch(t.lastActivity) > conn.LastActivity {
  142. conn.LastActivity = utils.GetTimeAsMsSinceEpoch(t.lastActivity)
  143. }
  144. var operationType string
  145. var size int64
  146. if t.transferType == transferUpload {
  147. operationType = operationUpload
  148. size = t.bytesReceived
  149. } else {
  150. operationType = operationDownload
  151. size = t.bytesSent
  152. }
  153. connTransfer := connectionTransfer{
  154. OperationType: operationType,
  155. StartTime: utils.GetTimeAsMsSinceEpoch(t.start),
  156. Size: size,
  157. LastActivity: utils.GetTimeAsMsSinceEpoch(t.lastActivity),
  158. }
  159. conn.Transfers = append(conn.Transfers, connTransfer)
  160. }
  161. }
  162. stats = append(stats, conn)
  163. }
  164. return stats
  165. }
  166. func startIdleTimer(maxIdleTime time.Duration) {
  167. idleConnectionTicker = time.NewTicker(5 * time.Minute)
  168. idleTimeout = maxIdleTime
  169. go func() {
  170. for t := range idleConnectionTicker.C {
  171. logger.Debug(logSender, "idle connections check ticker %v", t)
  172. checkIdleConnections()
  173. }
  174. }()
  175. }
  176. func checkIdleConnections() {
  177. mutex.RLock()
  178. defer mutex.RUnlock()
  179. for _, c := range openConnections {
  180. idleTime := time.Since(c.lastActivity)
  181. for _, t := range activeTransfers {
  182. if t.connectionID == c.ID {
  183. transferIdleTime := time.Since(t.lastActivity)
  184. if transferIdleTime < idleTime {
  185. logger.Debug(logSender, "idle time: %v setted to transfer idle time: %v connection id: %v",
  186. idleTime, transferIdleTime, c.ID)
  187. idleTime = transferIdleTime
  188. }
  189. }
  190. }
  191. if idleTime > idleTimeout {
  192. logger.Debug(logSender, "close idle connection id: %v idle time: %v", c.ID, idleTime)
  193. err := c.sshConn.Close()
  194. if err != nil {
  195. logger.Warn(logSender, "error closing idle connection: %v", err)
  196. }
  197. }
  198. }
  199. logger.Debug(logSender, "check idle connections ended")
  200. }
  201. func addConnection(id string, conn Connection) {
  202. mutex.Lock()
  203. defer mutex.Unlock()
  204. openConnections[id] = conn
  205. logger.Debug(logSender, "connection added, num open connections: %v", len(openConnections))
  206. }
  207. func removeConnection(id string) {
  208. mutex.Lock()
  209. defer mutex.Unlock()
  210. delete(openConnections, id)
  211. logger.Debug(logSender, "connection removed, num open connections: %v", len(openConnections))
  212. }
  213. func addTransfer(transfer *Transfer) {
  214. mutex.Lock()
  215. defer mutex.Unlock()
  216. activeTransfers = append(activeTransfers, transfer)
  217. }
  218. func removeTransfer(transfer *Transfer) {
  219. mutex.Lock()
  220. defer mutex.Unlock()
  221. indexToRemove := -1
  222. for i, v := range activeTransfers {
  223. if v == transfer {
  224. indexToRemove = i
  225. break
  226. }
  227. }
  228. if indexToRemove >= 0 {
  229. activeTransfers[indexToRemove] = activeTransfers[len(activeTransfers)-1]
  230. activeTransfers = activeTransfers[:len(activeTransfers)-1]
  231. } else {
  232. logger.Warn(logSender, "transfer to remove not found!")
  233. }
  234. }
  235. func updateConnectionActivity(id string) {
  236. mutex.Lock()
  237. defer mutex.Unlock()
  238. if c, ok := openConnections[id]; ok {
  239. c.lastActivity = time.Now()
  240. openConnections[id] = c
  241. }
  242. }