1
0

common.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983
  1. // Package common defines code shared among file transfer packages and protocols
  2. package common
  3. import (
  4. "context"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "os/exec"
  12. "path/filepath"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. "github.com/pires/go-proxyproto"
  18. "github.com/drakkan/sftpgo/v2/dataprovider"
  19. "github.com/drakkan/sftpgo/v2/httpclient"
  20. "github.com/drakkan/sftpgo/v2/logger"
  21. "github.com/drakkan/sftpgo/v2/metric"
  22. "github.com/drakkan/sftpgo/v2/util"
  23. "github.com/drakkan/sftpgo/v2/vfs"
  24. )
  25. // constants
  26. const (
  27. logSender = "common"
  28. uploadLogSender = "Upload"
  29. downloadLogSender = "Download"
  30. renameLogSender = "Rename"
  31. rmdirLogSender = "Rmdir"
  32. mkdirLogSender = "Mkdir"
  33. symlinkLogSender = "Symlink"
  34. removeLogSender = "Remove"
  35. chownLogSender = "Chown"
  36. chmodLogSender = "Chmod"
  37. chtimesLogSender = "Chtimes"
  38. truncateLogSender = "Truncate"
  39. operationDownload = "download"
  40. operationUpload = "upload"
  41. operationDelete = "delete"
  42. // Pre-download action name
  43. OperationPreDownload = "pre-download"
  44. // Pre-upload action name
  45. OperationPreUpload = "pre-upload"
  46. operationPreDelete = "pre-delete"
  47. operationRename = "rename"
  48. operationMkdir = "mkdir"
  49. operationRmdir = "rmdir"
  50. // SSH command action name
  51. OperationSSHCmd = "ssh_cmd"
  52. chtimesFormat = "2006-01-02T15:04:05" // YYYY-MM-DDTHH:MM:SS
  53. idleTimeoutCheckInterval = 3 * time.Minute
  54. )
  55. // Stat flags
  56. const (
  57. StatAttrUIDGID = 1
  58. StatAttrPerms = 2
  59. StatAttrTimes = 4
  60. StatAttrSize = 8
  61. )
  62. // Transfer types
  63. const (
  64. TransferUpload = iota
  65. TransferDownload
  66. )
  67. // Supported protocols
  68. const (
  69. ProtocolSFTP = "SFTP"
  70. ProtocolSCP = "SCP"
  71. ProtocolSSH = "SSH"
  72. ProtocolFTP = "FTP"
  73. ProtocolWebDAV = "DAV"
  74. ProtocolHTTP = "HTTP"
  75. )
  76. // Upload modes
  77. const (
  78. UploadModeStandard = iota
  79. UploadModeAtomic
  80. UploadModeAtomicWithResume
  81. )
  82. func init() {
  83. Connections.clients = clientsMap{
  84. clients: make(map[string]int),
  85. }
  86. }
  87. // errors definitions
  88. var (
  89. ErrPermissionDenied = errors.New("permission denied")
  90. ErrNotExist = errors.New("no such file or directory")
  91. ErrOpUnsupported = errors.New("operation unsupported")
  92. ErrGenericFailure = errors.New("failure")
  93. ErrQuotaExceeded = errors.New("denying write due to space limit")
  94. ErrSkipPermissionsCheck = errors.New("permission check skipped")
  95. ErrConnectionDenied = errors.New("you are not allowed to connect")
  96. ErrNoBinding = errors.New("no binding configured")
  97. ErrCrtRevoked = errors.New("your certificate has been revoked")
  98. ErrNoCredentials = errors.New("no credential provided")
  99. ErrInternalFailure = errors.New("internal failure")
  100. errNoTransfer = errors.New("requested transfer not found")
  101. errTransferMismatch = errors.New("transfer mismatch")
  102. )
  103. var (
  104. // Config is the configuration for the supported protocols
  105. Config Configuration
  106. // Connections is the list of active connections
  107. Connections ActiveConnections
  108. // QuotaScans is the list of active quota scans
  109. QuotaScans ActiveScans
  110. idleTimeoutTicker *time.Ticker
  111. idleTimeoutTickerDone chan bool
  112. supportedProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP, ProtocolWebDAV, ProtocolHTTP}
  113. // the map key is the protocol, for each protocol we can have multiple rate limiters
  114. rateLimiters map[string][]*rateLimiter
  115. )
  116. // Initialize sets the common configuration
  117. func Initialize(c Configuration) error {
  118. Config = c
  119. Config.idleLoginTimeout = 2 * time.Minute
  120. Config.idleTimeoutAsDuration = time.Duration(Config.IdleTimeout) * time.Minute
  121. if Config.IdleTimeout > 0 {
  122. startIdleTimeoutTicker(idleTimeoutCheckInterval)
  123. }
  124. Config.defender = nil
  125. if c.DefenderConfig.Enabled {
  126. defender, err := newInMemoryDefender(&c.DefenderConfig)
  127. if err != nil {
  128. return fmt.Errorf("defender initialization error: %v", err)
  129. }
  130. logger.Info(logSender, "", "defender initialized with config %+v", c.DefenderConfig)
  131. Config.defender = defender
  132. }
  133. rateLimiters = make(map[string][]*rateLimiter)
  134. for _, rlCfg := range c.RateLimitersConfig {
  135. if rlCfg.isEnabled() {
  136. if err := rlCfg.validate(); err != nil {
  137. return fmt.Errorf("rate limiters initialization error: %v", err)
  138. }
  139. rateLimiter := rlCfg.getLimiter()
  140. for _, protocol := range rlCfg.Protocols {
  141. rateLimiters[protocol] = append(rateLimiters[protocol], rateLimiter)
  142. }
  143. }
  144. }
  145. vfs.SetTempPath(c.TempPath)
  146. dataprovider.SetTempPath(c.TempPath)
  147. return nil
  148. }
  149. // LimitRate blocks until all the configured rate limiters
  150. // allow one event to happen.
  151. // It returns an error if the time to wait exceeds the max
  152. // allowed delay
  153. func LimitRate(protocol, ip string) (time.Duration, error) {
  154. for _, limiter := range rateLimiters[protocol] {
  155. if delay, err := limiter.Wait(ip); err != nil {
  156. logger.Debug(logSender, "", "protocol %v ip %v: %v", protocol, ip, err)
  157. return delay, err
  158. }
  159. }
  160. return 0, nil
  161. }
  162. // ReloadDefender reloads the defender's block and safe lists
  163. func ReloadDefender() error {
  164. if Config.defender == nil {
  165. return nil
  166. }
  167. return Config.defender.Reload()
  168. }
  169. // IsBanned returns true if the specified IP address is banned
  170. func IsBanned(ip string) bool {
  171. if Config.defender == nil {
  172. return false
  173. }
  174. return Config.defender.IsBanned(ip)
  175. }
  176. // GetDefenderBanTime returns the ban time for the given IP
  177. // or nil if the IP is not banned or the defender is disabled
  178. func GetDefenderBanTime(ip string) *time.Time {
  179. if Config.defender == nil {
  180. return nil
  181. }
  182. return Config.defender.GetBanTime(ip)
  183. }
  184. // GetDefenderHosts returns hosts that are banned or for which some violations have been detected
  185. func GetDefenderHosts() []*DefenderEntry {
  186. if Config.defender == nil {
  187. return nil
  188. }
  189. return Config.defender.GetHosts()
  190. }
  191. // GetDefenderHost returns a defender host by ip, if any
  192. func GetDefenderHost(ip string) (*DefenderEntry, error) {
  193. if Config.defender == nil {
  194. return nil, errors.New("defender is disabled")
  195. }
  196. return Config.defender.GetHost(ip)
  197. }
  198. // DeleteDefenderHost removes the specified IP address from the defender lists
  199. func DeleteDefenderHost(ip string) bool {
  200. if Config.defender == nil {
  201. return false
  202. }
  203. return Config.defender.DeleteHost(ip)
  204. }
  205. // GetDefenderScore returns the score for the given IP
  206. func GetDefenderScore(ip string) int {
  207. if Config.defender == nil {
  208. return 0
  209. }
  210. return Config.defender.GetScore(ip)
  211. }
  212. // AddDefenderEvent adds the specified defender event for the given IP
  213. func AddDefenderEvent(ip string, event HostEvent) {
  214. if Config.defender == nil {
  215. return
  216. }
  217. Config.defender.AddEvent(ip, event)
  218. }
  219. // the ticker cannot be started/stopped from multiple goroutines
  220. func startIdleTimeoutTicker(duration time.Duration) {
  221. stopIdleTimeoutTicker()
  222. idleTimeoutTicker = time.NewTicker(duration)
  223. idleTimeoutTickerDone = make(chan bool)
  224. go func() {
  225. for {
  226. select {
  227. case <-idleTimeoutTickerDone:
  228. return
  229. case <-idleTimeoutTicker.C:
  230. Connections.checkIdles()
  231. }
  232. }
  233. }()
  234. }
  235. func stopIdleTimeoutTicker() {
  236. if idleTimeoutTicker != nil {
  237. idleTimeoutTicker.Stop()
  238. idleTimeoutTickerDone <- true
  239. idleTimeoutTicker = nil
  240. }
  241. }
  242. // ActiveTransfer defines the interface for the current active transfers
  243. type ActiveTransfer interface {
  244. GetID() uint64
  245. GetType() int
  246. GetSize() int64
  247. GetVirtualPath() string
  248. GetStartTime() time.Time
  249. SignalClose()
  250. Truncate(fsPath string, size int64) (int64, error)
  251. GetRealFsPath(fsPath string) string
  252. }
  253. // ActiveConnection defines the interface for the current active connections
  254. type ActiveConnection interface {
  255. GetID() string
  256. GetUsername() string
  257. GetLocalAddress() string
  258. GetRemoteAddress() string
  259. GetClientVersion() string
  260. GetProtocol() string
  261. GetConnectionTime() time.Time
  262. GetLastActivity() time.Time
  263. GetCommand() string
  264. Disconnect() error
  265. AddTransfer(t ActiveTransfer)
  266. RemoveTransfer(t ActiveTransfer)
  267. GetTransfers() []ConnectionTransfer
  268. CloseFS() error
  269. }
  270. // StatAttributes defines the attributes for set stat commands
  271. type StatAttributes struct {
  272. Mode os.FileMode
  273. Atime time.Time
  274. Mtime time.Time
  275. UID int
  276. GID int
  277. Flags int
  278. Size int64
  279. }
  280. // ConnectionTransfer defines the trasfer details to expose
  281. type ConnectionTransfer struct {
  282. ID uint64 `json:"-"`
  283. OperationType string `json:"operation_type"`
  284. StartTime int64 `json:"start_time"`
  285. Size int64 `json:"size"`
  286. VirtualPath string `json:"path"`
  287. }
  288. func (t *ConnectionTransfer) getConnectionTransferAsString() string {
  289. result := ""
  290. switch t.OperationType {
  291. case operationUpload:
  292. result += "UL "
  293. case operationDownload:
  294. result += "DL "
  295. }
  296. result += fmt.Sprintf("%#v ", t.VirtualPath)
  297. if t.Size > 0 {
  298. elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(t.StartTime))
  299. speed := float64(t.Size) / float64(util.GetTimeAsMsSinceEpoch(time.Now())-t.StartTime)
  300. result += fmt.Sprintf("Size: %#v Elapsed: %#v Speed: \"%.1f KB/s\"", util.ByteCountIEC(t.Size),
  301. util.GetDurationAsString(elapsed), speed)
  302. }
  303. return result
  304. }
  305. // Configuration defines configuration parameters common to all supported protocols
  306. type Configuration struct {
  307. // Maximum idle timeout as minutes. If a client is idle for a time that exceeds this setting it will be disconnected.
  308. // 0 means disabled
  309. IdleTimeout int `json:"idle_timeout" mapstructure:"idle_timeout"`
  310. // UploadMode 0 means standard, the files are uploaded directly to the requested path.
  311. // 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path
  312. // when the client ends the upload. Atomic mode avoid problems such as a web server that
  313. // serves partial files when the files are being uploaded.
  314. // In atomic mode if there is an upload error the temporary file is deleted and so the requested
  315. // upload path will not contain a partial file.
  316. // 2 means atomic with resume support: as atomic but if there is an upload error the temporary
  317. // file is renamed to the requested path and not deleted, this way a client can reconnect and resume
  318. // the upload.
  319. UploadMode int `json:"upload_mode" mapstructure:"upload_mode"`
  320. // Actions to execute for SFTP file operations and SSH commands
  321. Actions ProtocolActions `json:"actions" mapstructure:"actions"`
  322. // SetstatMode 0 means "normal mode": requests for changing permissions and owner/group are executed.
  323. // 1 means "ignore mode": requests for changing permissions and owner/group are silently ignored.
  324. // 2 means "ignore mode for cloud fs": requests for changing permissions and owner/group/time are
  325. // silently ignored for cloud based filesystem such as S3, GCS, Azure Blob
  326. SetstatMode int `json:"setstat_mode" mapstructure:"setstat_mode"`
  327. // TempPath defines the path for temporary files such as those used for atomic uploads or file pipes.
  328. // If you set this option you must make sure that the defined path exists, is accessible for writing
  329. // by the user running SFTPGo, and is on the same filesystem as the users home directories otherwise
  330. // the renaming for atomic uploads will become a copy and therefore may take a long time.
  331. // The temporary files are not namespaced. The default is generally fine. Leave empty for the default.
  332. TempPath string `json:"temp_path" mapstructure:"temp_path"`
  333. // Support for HAProxy PROXY protocol.
  334. // If you are running SFTPGo behind a proxy server such as HAProxy, AWS ELB or NGNIX, you can enable
  335. // the proxy protocol. It provides a convenient way to safely transport connection information
  336. // such as a client's address across multiple layers of NAT or TCP proxies to get the real
  337. // client IP address instead of the proxy IP. Both protocol versions 1 and 2 are supported.
  338. // - 0 means disabled
  339. // - 1 means proxy protocol enabled. Proxy header will be used and requests without proxy header will be accepted.
  340. // - 2 means proxy protocol required. Proxy header will be used and requests without proxy header will be rejected.
  341. // If the proxy protocol is enabled in SFTPGo then you have to enable the protocol in your proxy configuration too,
  342. // for example for HAProxy add "send-proxy" or "send-proxy-v2" to each server configuration line.
  343. ProxyProtocol int `json:"proxy_protocol" mapstructure:"proxy_protocol"`
  344. // List of IP addresses and IP ranges allowed to send the proxy header.
  345. // If proxy protocol is set to 1 and we receive a proxy header from an IP that is not in the list then the
  346. // connection will be accepted and the header will be ignored.
  347. // If proxy protocol is set to 2 and we receive a proxy header from an IP that is not in the list then the
  348. // connection will be rejected.
  349. ProxyAllowed []string `json:"proxy_allowed" mapstructure:"proxy_allowed"`
  350. // Absolute path to an external program or an HTTP URL to invoke as soon as SFTPGo starts.
  351. // If you define an HTTP URL it will be invoked using a `GET` request.
  352. // Please note that SFTPGo services may not yet be available when this hook is run.
  353. // Leave empty do disable.
  354. StartupHook string `json:"startup_hook" mapstructure:"startup_hook"`
  355. // Absolute path to an external program or an HTTP URL to invoke after a user connects
  356. // and before he tries to login. It allows you to reject the connection based on the source
  357. // ip address. Leave empty do disable.
  358. PostConnectHook string `json:"post_connect_hook" mapstructure:"post_connect_hook"`
  359. // Maximum number of concurrent client connections. 0 means unlimited
  360. MaxTotalConnections int `json:"max_total_connections" mapstructure:"max_total_connections"`
  361. // Maximum number of concurrent client connections from the same host (IP). 0 means unlimited
  362. MaxPerHostConnections int `json:"max_per_host_connections" mapstructure:"max_per_host_connections"`
  363. // Defender configuration
  364. DefenderConfig DefenderConfig `json:"defender" mapstructure:"defender"`
  365. // Rate limiter configurations
  366. RateLimitersConfig []RateLimiterConfig `json:"rate_limiters" mapstructure:"rate_limiters"`
  367. idleTimeoutAsDuration time.Duration
  368. idleLoginTimeout time.Duration
  369. defender Defender
  370. }
  371. // IsAtomicUploadEnabled returns true if atomic upload is enabled
  372. func (c *Configuration) IsAtomicUploadEnabled() bool {
  373. return c.UploadMode == UploadModeAtomic || c.UploadMode == UploadModeAtomicWithResume
  374. }
  375. // GetProxyListener returns a wrapper for the given listener that supports the
  376. // HAProxy Proxy Protocol
  377. func (c *Configuration) GetProxyListener(listener net.Listener) (*proxyproto.Listener, error) {
  378. var err error
  379. if c.ProxyProtocol > 0 {
  380. var policyFunc func(upstream net.Addr) (proxyproto.Policy, error)
  381. if c.ProxyProtocol == 1 && len(c.ProxyAllowed) > 0 {
  382. policyFunc, err = proxyproto.LaxWhiteListPolicy(c.ProxyAllowed)
  383. if err != nil {
  384. return nil, err
  385. }
  386. }
  387. if c.ProxyProtocol == 2 {
  388. if len(c.ProxyAllowed) == 0 {
  389. policyFunc = func(upstream net.Addr) (proxyproto.Policy, error) {
  390. return proxyproto.REQUIRE, nil
  391. }
  392. } else {
  393. policyFunc, err = proxyproto.StrictWhiteListPolicy(c.ProxyAllowed)
  394. if err != nil {
  395. return nil, err
  396. }
  397. }
  398. }
  399. return &proxyproto.Listener{
  400. Listener: listener,
  401. Policy: policyFunc,
  402. ReadHeaderTimeout: 5 * time.Second,
  403. }, nil
  404. }
  405. return nil, errors.New("proxy protocol not configured")
  406. }
  407. // ExecuteStartupHook runs the startup hook if defined
  408. func (c *Configuration) ExecuteStartupHook() error {
  409. if c.StartupHook == "" {
  410. return nil
  411. }
  412. if strings.HasPrefix(c.StartupHook, "http") {
  413. var url *url.URL
  414. url, err := url.Parse(c.StartupHook)
  415. if err != nil {
  416. logger.Warn(logSender, "", "Invalid startup hook %#v: %v", c.StartupHook, err)
  417. return err
  418. }
  419. startTime := time.Now()
  420. resp, err := httpclient.RetryableGet(url.String())
  421. if err != nil {
  422. logger.Warn(logSender, "", "Error executing startup hook: %v", err)
  423. return err
  424. }
  425. defer resp.Body.Close()
  426. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, response code: %v", time.Since(startTime), resp.StatusCode)
  427. return nil
  428. }
  429. if !filepath.IsAbs(c.StartupHook) {
  430. err := fmt.Errorf("invalid startup hook %#v", c.StartupHook)
  431. logger.Warn(logSender, "", "Invalid startup hook %#v", c.StartupHook)
  432. return err
  433. }
  434. startTime := time.Now()
  435. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  436. defer cancel()
  437. cmd := exec.CommandContext(ctx, c.StartupHook)
  438. err := cmd.Run()
  439. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, error: %v", time.Since(startTime), err)
  440. return nil
  441. }
  442. // ExecutePostConnectHook executes the post connect hook if defined
  443. func (c *Configuration) ExecutePostConnectHook(ipAddr, protocol string) error {
  444. if c.PostConnectHook == "" {
  445. return nil
  446. }
  447. if strings.HasPrefix(c.PostConnectHook, "http") {
  448. var url *url.URL
  449. url, err := url.Parse(c.PostConnectHook)
  450. if err != nil {
  451. logger.Warn(protocol, "", "Login from ip %#v denied, invalid post connect hook %#v: %v",
  452. ipAddr, c.PostConnectHook, err)
  453. return err
  454. }
  455. q := url.Query()
  456. q.Add("ip", ipAddr)
  457. q.Add("protocol", protocol)
  458. url.RawQuery = q.Encode()
  459. resp, err := httpclient.RetryableGet(url.String())
  460. if err != nil {
  461. logger.Warn(protocol, "", "Login from ip %#v denied, error executing post connect hook: %v", ipAddr, err)
  462. return err
  463. }
  464. defer resp.Body.Close()
  465. if resp.StatusCode != http.StatusOK {
  466. logger.Warn(protocol, "", "Login from ip %#v denied, post connect hook response code: %v", ipAddr, resp.StatusCode)
  467. return errUnexpectedHTTResponse
  468. }
  469. return nil
  470. }
  471. if !filepath.IsAbs(c.PostConnectHook) {
  472. err := fmt.Errorf("invalid post connect hook %#v", c.PostConnectHook)
  473. logger.Warn(protocol, "", "Login from ip %#v denied: %v", ipAddr, err)
  474. return err
  475. }
  476. ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
  477. defer cancel()
  478. cmd := exec.CommandContext(ctx, c.PostConnectHook)
  479. cmd.Env = append(os.Environ(),
  480. fmt.Sprintf("SFTPGO_CONNECTION_IP=%v", ipAddr),
  481. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%v", protocol))
  482. err := cmd.Run()
  483. if err != nil {
  484. logger.Warn(protocol, "", "Login from ip %#v denied, connect hook error: %v", ipAddr, err)
  485. }
  486. return err
  487. }
  488. // SSHConnection defines an ssh connection.
  489. // Each SSH connection can open several channels for SFTP or SSH commands
  490. type SSHConnection struct {
  491. id string
  492. conn net.Conn
  493. lastActivity int64
  494. }
  495. // NewSSHConnection returns a new SSHConnection
  496. func NewSSHConnection(id string, conn net.Conn) *SSHConnection {
  497. return &SSHConnection{
  498. id: id,
  499. conn: conn,
  500. lastActivity: time.Now().UnixNano(),
  501. }
  502. }
  503. // GetID returns the ID for this SSHConnection
  504. func (c *SSHConnection) GetID() string {
  505. return c.id
  506. }
  507. // UpdateLastActivity updates last activity for this connection
  508. func (c *SSHConnection) UpdateLastActivity() {
  509. atomic.StoreInt64(&c.lastActivity, time.Now().UnixNano())
  510. }
  511. // GetLastActivity returns the last connection activity
  512. func (c *SSHConnection) GetLastActivity() time.Time {
  513. return time.Unix(0, atomic.LoadInt64(&c.lastActivity))
  514. }
  515. // Close closes the underlying network connection
  516. func (c *SSHConnection) Close() error {
  517. return c.conn.Close()
  518. }
  519. // ActiveConnections holds the currect active connections with the associated transfers
  520. type ActiveConnections struct {
  521. // clients contains both authenticated and estabilished connections and the ones waiting
  522. // for authentication
  523. clients clientsMap
  524. sync.RWMutex
  525. connections []ActiveConnection
  526. sshConnections []*SSHConnection
  527. }
  528. // GetActiveSessions returns the number of active sessions for the given username.
  529. // We return the open sessions for any protocol
  530. func (conns *ActiveConnections) GetActiveSessions(username string) int {
  531. conns.RLock()
  532. defer conns.RUnlock()
  533. numSessions := 0
  534. for _, c := range conns.connections {
  535. if c.GetUsername() == username {
  536. numSessions++
  537. }
  538. }
  539. return numSessions
  540. }
  541. // Add adds a new connection to the active ones
  542. func (conns *ActiveConnections) Add(c ActiveConnection) {
  543. conns.Lock()
  544. defer conns.Unlock()
  545. conns.connections = append(conns.connections, c)
  546. metric.UpdateActiveConnectionsSize(len(conns.connections))
  547. logger.Debug(c.GetProtocol(), c.GetID(), "connection added, local address %#v, remote address %#v, num open connections: %v",
  548. c.GetLocalAddress(), c.GetRemoteAddress(), len(conns.connections))
  549. }
  550. // Swap replaces an existing connection with the given one.
  551. // This method is useful if you have to change some connection details
  552. // for example for FTP is used to update the connection once the user
  553. // authenticates
  554. func (conns *ActiveConnections) Swap(c ActiveConnection) error {
  555. conns.Lock()
  556. defer conns.Unlock()
  557. for idx, conn := range conns.connections {
  558. if conn.GetID() == c.GetID() {
  559. conn = nil
  560. conns.connections[idx] = c
  561. return nil
  562. }
  563. }
  564. return errors.New("connection to swap not found")
  565. }
  566. // Remove removes a connection from the active ones
  567. func (conns *ActiveConnections) Remove(connectionID string) {
  568. conns.Lock()
  569. defer conns.Unlock()
  570. for idx, conn := range conns.connections {
  571. if conn.GetID() == connectionID {
  572. err := conn.CloseFS()
  573. lastIdx := len(conns.connections) - 1
  574. conns.connections[idx] = conns.connections[lastIdx]
  575. conns.connections[lastIdx] = nil
  576. conns.connections = conns.connections[:lastIdx]
  577. metric.UpdateActiveConnectionsSize(lastIdx)
  578. logger.Debug(conn.GetProtocol(), conn.GetID(), "connection removed, local address %#v, remote address %#v close fs error: %v, num open connections: %v",
  579. conn.GetLocalAddress(), conn.GetRemoteAddress(), err, lastIdx)
  580. return
  581. }
  582. }
  583. logger.Warn(logSender, "", "connection id %#v to remove not found!", connectionID)
  584. }
  585. // Close closes an active connection.
  586. // It returns true on success
  587. func (conns *ActiveConnections) Close(connectionID string) bool {
  588. conns.RLock()
  589. result := false
  590. for _, c := range conns.connections {
  591. if c.GetID() == connectionID {
  592. defer func(conn ActiveConnection) {
  593. err := conn.Disconnect()
  594. logger.Debug(conn.GetProtocol(), conn.GetID(), "close connection requested, close err: %v", err)
  595. }(c)
  596. result = true
  597. break
  598. }
  599. }
  600. conns.RUnlock()
  601. return result
  602. }
  603. // AddSSHConnection adds a new ssh connection to the active ones
  604. func (conns *ActiveConnections) AddSSHConnection(c *SSHConnection) {
  605. conns.Lock()
  606. defer conns.Unlock()
  607. conns.sshConnections = append(conns.sshConnections, c)
  608. logger.Debug(logSender, c.GetID(), "ssh connection added, num open connections: %v", len(conns.sshConnections))
  609. }
  610. // RemoveSSHConnection removes a connection from the active ones
  611. func (conns *ActiveConnections) RemoveSSHConnection(connectionID string) {
  612. conns.Lock()
  613. defer conns.Unlock()
  614. for idx, conn := range conns.sshConnections {
  615. if conn.GetID() == connectionID {
  616. lastIdx := len(conns.sshConnections) - 1
  617. conns.sshConnections[idx] = conns.sshConnections[lastIdx]
  618. conns.sshConnections[lastIdx] = nil
  619. conns.sshConnections = conns.sshConnections[:lastIdx]
  620. logger.Debug(logSender, conn.GetID(), "ssh connection removed, num open ssh connections: %v", lastIdx)
  621. return
  622. }
  623. }
  624. logger.Warn(logSender, "", "ssh connection to remove with id %#v not found!", connectionID)
  625. }
  626. func (conns *ActiveConnections) checkIdles() {
  627. conns.RLock()
  628. for _, sshConn := range conns.sshConnections {
  629. idleTime := time.Since(sshConn.GetLastActivity())
  630. if idleTime > Config.idleTimeoutAsDuration {
  631. // we close the an ssh connection if it has no active connections associated
  632. idToMatch := fmt.Sprintf("_%v_", sshConn.GetID())
  633. toClose := true
  634. for _, conn := range conns.connections {
  635. if strings.Contains(conn.GetID(), idToMatch) {
  636. toClose = false
  637. break
  638. }
  639. }
  640. if toClose {
  641. defer func(c *SSHConnection) {
  642. err := c.Close()
  643. logger.Debug(logSender, c.GetID(), "close idle SSH connection, idle time: %v, close err: %v",
  644. time.Since(c.GetLastActivity()), err)
  645. }(sshConn)
  646. }
  647. }
  648. }
  649. for _, c := range conns.connections {
  650. idleTime := time.Since(c.GetLastActivity())
  651. isUnauthenticatedFTPUser := (c.GetProtocol() == ProtocolFTP && c.GetUsername() == "")
  652. if idleTime > Config.idleTimeoutAsDuration || (isUnauthenticatedFTPUser && idleTime > Config.idleLoginTimeout) {
  653. defer func(conn ActiveConnection, isFTPNoAuth bool) {
  654. err := conn.Disconnect()
  655. logger.Debug(conn.GetProtocol(), conn.GetID(), "close idle connection, idle time: %v, username: %#v close err: %v",
  656. time.Since(conn.GetLastActivity()), conn.GetUsername(), err)
  657. if isFTPNoAuth {
  658. ip := util.GetIPFromRemoteAddress(c.GetRemoteAddress())
  659. logger.ConnectionFailedLog("", ip, dataprovider.LoginMethodNoAuthTryed, c.GetProtocol(), "client idle")
  660. metric.AddNoAuthTryed()
  661. AddDefenderEvent(ip, HostEventNoLoginTried)
  662. dataprovider.ExecutePostLoginHook(&dataprovider.User{}, dataprovider.LoginMethodNoAuthTryed, ip, c.GetProtocol(),
  663. dataprovider.ErrNoAuthTryed)
  664. }
  665. }(c, isUnauthenticatedFTPUser)
  666. }
  667. }
  668. conns.RUnlock()
  669. }
  670. // AddClientConnection stores a new client connection
  671. func (conns *ActiveConnections) AddClientConnection(ipAddr string) {
  672. conns.clients.add(ipAddr)
  673. }
  674. // RemoveClientConnection removes a disconnected client from the tracked ones
  675. func (conns *ActiveConnections) RemoveClientConnection(ipAddr string) {
  676. conns.clients.remove(ipAddr)
  677. }
  678. // GetClientConnections returns the total number of client connections
  679. func (conns *ActiveConnections) GetClientConnections() int32 {
  680. return conns.clients.getTotal()
  681. }
  682. // IsNewConnectionAllowed returns false if the maximum number of concurrent allowed connections is exceeded
  683. func (conns *ActiveConnections) IsNewConnectionAllowed(ipAddr string) bool {
  684. if Config.MaxTotalConnections == 0 && Config.MaxPerHostConnections == 0 {
  685. return true
  686. }
  687. if Config.MaxPerHostConnections > 0 {
  688. if total := conns.clients.getTotalFrom(ipAddr); total > Config.MaxPerHostConnections {
  689. logger.Debug(logSender, "", "active connections from %v %v/%v", ipAddr, total, Config.MaxPerHostConnections)
  690. AddDefenderEvent(ipAddr, HostEventLimitExceeded)
  691. return false
  692. }
  693. }
  694. if Config.MaxTotalConnections > 0 {
  695. if total := conns.clients.getTotal(); total > int32(Config.MaxTotalConnections) {
  696. logger.Debug(logSender, "", "active client connections %v/%v", total, Config.MaxTotalConnections)
  697. return false
  698. }
  699. // on a single SFTP connection we could have multiple SFTP channels or commands
  700. // so we check the estabilished connections too
  701. conns.RLock()
  702. defer conns.RUnlock()
  703. return len(conns.connections) < Config.MaxTotalConnections
  704. }
  705. return true
  706. }
  707. // GetStats returns stats for active connections
  708. func (conns *ActiveConnections) GetStats() []*ConnectionStatus {
  709. conns.RLock()
  710. defer conns.RUnlock()
  711. stats := make([]*ConnectionStatus, 0, len(conns.connections))
  712. for _, c := range conns.connections {
  713. stat := &ConnectionStatus{
  714. Username: c.GetUsername(),
  715. ConnectionID: c.GetID(),
  716. ClientVersion: c.GetClientVersion(),
  717. RemoteAddress: c.GetRemoteAddress(),
  718. ConnectionTime: util.GetTimeAsMsSinceEpoch(c.GetConnectionTime()),
  719. LastActivity: util.GetTimeAsMsSinceEpoch(c.GetLastActivity()),
  720. Protocol: c.GetProtocol(),
  721. Command: c.GetCommand(),
  722. Transfers: c.GetTransfers(),
  723. }
  724. stats = append(stats, stat)
  725. }
  726. return stats
  727. }
  728. // ConnectionStatus returns the status for an active connection
  729. type ConnectionStatus struct {
  730. // Logged in username
  731. Username string `json:"username"`
  732. // Unique identifier for the connection
  733. ConnectionID string `json:"connection_id"`
  734. // client's version string
  735. ClientVersion string `json:"client_version,omitempty"`
  736. // Remote address for this connection
  737. RemoteAddress string `json:"remote_address"`
  738. // Connection time as unix timestamp in milliseconds
  739. ConnectionTime int64 `json:"connection_time"`
  740. // Last activity as unix timestamp in milliseconds
  741. LastActivity int64 `json:"last_activity"`
  742. // Protocol for this connection
  743. Protocol string `json:"protocol"`
  744. // active uploads/downloads
  745. Transfers []ConnectionTransfer `json:"active_transfers,omitempty"`
  746. // SSH command or WebDAV method
  747. Command string `json:"command,omitempty"`
  748. }
  749. // GetConnectionDuration returns the connection duration as string
  750. func (c *ConnectionStatus) GetConnectionDuration() string {
  751. elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(c.ConnectionTime))
  752. return util.GetDurationAsString(elapsed)
  753. }
  754. // GetConnectionInfo returns connection info.
  755. // Protocol,Client Version and RemoteAddress are returned.
  756. func (c *ConnectionStatus) GetConnectionInfo() string {
  757. var result strings.Builder
  758. result.WriteString(fmt.Sprintf("%v. Client: %#v From: %#v", c.Protocol, c.ClientVersion, c.RemoteAddress))
  759. if c.Command == "" {
  760. return result.String()
  761. }
  762. switch c.Protocol {
  763. case ProtocolSSH, ProtocolFTP:
  764. result.WriteString(fmt.Sprintf(". Command: %#v", c.Command))
  765. case ProtocolWebDAV:
  766. result.WriteString(fmt.Sprintf(". Method: %#v", c.Command))
  767. }
  768. return result.String()
  769. }
  770. // GetTransfersAsString returns the active transfers as string
  771. func (c *ConnectionStatus) GetTransfersAsString() string {
  772. result := ""
  773. for _, t := range c.Transfers {
  774. if result != "" {
  775. result += ". "
  776. }
  777. result += t.getConnectionTransferAsString()
  778. }
  779. return result
  780. }
  781. // ActiveQuotaScan defines an active quota scan for a user home dir
  782. type ActiveQuotaScan struct {
  783. // Username to which the quota scan refers
  784. Username string `json:"username"`
  785. // quota scan start time as unix timestamp in milliseconds
  786. StartTime int64 `json:"start_time"`
  787. }
  788. // ActiveVirtualFolderQuotaScan defines an active quota scan for a virtual folder
  789. type ActiveVirtualFolderQuotaScan struct {
  790. // folder name to which the quota scan refers
  791. Name string `json:"name"`
  792. // quota scan start time as unix timestamp in milliseconds
  793. StartTime int64 `json:"start_time"`
  794. }
  795. // ActiveScans holds the active quota scans
  796. type ActiveScans struct {
  797. sync.RWMutex
  798. UserScans []ActiveQuotaScan
  799. FolderScans []ActiveVirtualFolderQuotaScan
  800. }
  801. // GetUsersQuotaScans returns the active quota scans for users home directories
  802. func (s *ActiveScans) GetUsersQuotaScans() []ActiveQuotaScan {
  803. s.RLock()
  804. defer s.RUnlock()
  805. scans := make([]ActiveQuotaScan, len(s.UserScans))
  806. copy(scans, s.UserScans)
  807. return scans
  808. }
  809. // AddUserQuotaScan adds a user to the ones with active quota scans.
  810. // Returns false if the user has a quota scan already running
  811. func (s *ActiveScans) AddUserQuotaScan(username string) bool {
  812. s.Lock()
  813. defer s.Unlock()
  814. for _, scan := range s.UserScans {
  815. if scan.Username == username {
  816. return false
  817. }
  818. }
  819. s.UserScans = append(s.UserScans, ActiveQuotaScan{
  820. Username: username,
  821. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  822. })
  823. return true
  824. }
  825. // RemoveUserQuotaScan removes a user from the ones with active quota scans.
  826. // Returns false if the user has no active quota scans
  827. func (s *ActiveScans) RemoveUserQuotaScan(username string) bool {
  828. s.Lock()
  829. defer s.Unlock()
  830. for idx, scan := range s.UserScans {
  831. if scan.Username == username {
  832. lastIdx := len(s.UserScans) - 1
  833. s.UserScans[idx] = s.UserScans[lastIdx]
  834. s.UserScans = s.UserScans[:lastIdx]
  835. return true
  836. }
  837. }
  838. return false
  839. }
  840. // GetVFoldersQuotaScans returns the active quota scans for virtual folders
  841. func (s *ActiveScans) GetVFoldersQuotaScans() []ActiveVirtualFolderQuotaScan {
  842. s.RLock()
  843. defer s.RUnlock()
  844. scans := make([]ActiveVirtualFolderQuotaScan, len(s.FolderScans))
  845. copy(scans, s.FolderScans)
  846. return scans
  847. }
  848. // AddVFolderQuotaScan adds a virtual folder to the ones with active quota scans.
  849. // Returns false if the folder has a quota scan already running
  850. func (s *ActiveScans) AddVFolderQuotaScan(folderName string) bool {
  851. s.Lock()
  852. defer s.Unlock()
  853. for _, scan := range s.FolderScans {
  854. if scan.Name == folderName {
  855. return false
  856. }
  857. }
  858. s.FolderScans = append(s.FolderScans, ActiveVirtualFolderQuotaScan{
  859. Name: folderName,
  860. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  861. })
  862. return true
  863. }
  864. // RemoveVFolderQuotaScan removes a folder from the ones with active quota scans.
  865. // Returns false if the folder has no active quota scans
  866. func (s *ActiveScans) RemoveVFolderQuotaScan(folderName string) bool {
  867. s.Lock()
  868. defer s.Unlock()
  869. for idx, scan := range s.FolderScans {
  870. if scan.Name == folderName {
  871. lastIdx := len(s.FolderScans) - 1
  872. s.FolderScans[idx] = s.FolderScans[lastIdx]
  873. s.FolderScans = s.FolderScans[:lastIdx]
  874. return true
  875. }
  876. }
  877. return false
  878. }