common.go 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package common defines code shared among file transfer packages and protocols
  15. package common
  16. import (
  17. "context"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "os/exec"
  25. "path/filepath"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/pires/go-proxyproto"
  32. "github.com/drakkan/sftpgo/v2/internal/command"
  33. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  34. "github.com/drakkan/sftpgo/v2/internal/httpclient"
  35. "github.com/drakkan/sftpgo/v2/internal/logger"
  36. "github.com/drakkan/sftpgo/v2/internal/metric"
  37. "github.com/drakkan/sftpgo/v2/internal/plugin"
  38. "github.com/drakkan/sftpgo/v2/internal/smtp"
  39. "github.com/drakkan/sftpgo/v2/internal/util"
  40. "github.com/drakkan/sftpgo/v2/internal/vfs"
  41. )
  42. // constants
  43. const (
  44. logSender = "common"
  45. uploadLogSender = "Upload"
  46. downloadLogSender = "Download"
  47. renameLogSender = "Rename"
  48. rmdirLogSender = "Rmdir"
  49. mkdirLogSender = "Mkdir"
  50. symlinkLogSender = "Symlink"
  51. removeLogSender = "Remove"
  52. chownLogSender = "Chown"
  53. chmodLogSender = "Chmod"
  54. chtimesLogSender = "Chtimes"
  55. copyLogSender = "Copy"
  56. truncateLogSender = "Truncate"
  57. operationDownload = "download"
  58. operationUpload = "upload"
  59. operationFirstDownload = "first-download"
  60. operationFirstUpload = "first-upload"
  61. operationDelete = "delete"
  62. operationCopy = "copy"
  63. // Pre-download action name
  64. OperationPreDownload = "pre-download"
  65. // Pre-upload action name
  66. OperationPreUpload = "pre-upload"
  67. operationPreDelete = "pre-delete"
  68. operationRename = "rename"
  69. operationMkdir = "mkdir"
  70. operationRmdir = "rmdir"
  71. // SSH command action name
  72. OperationSSHCmd = "ssh_cmd"
  73. chtimesFormat = "2006-01-02T15:04:05" // YYYY-MM-DDTHH:MM:SS
  74. idleTimeoutCheckInterval = 3 * time.Minute
  75. periodicTimeoutCheckInterval = 1 * time.Minute
  76. )
  77. // Stat flags
  78. const (
  79. StatAttrUIDGID = 1
  80. StatAttrPerms = 2
  81. StatAttrTimes = 4
  82. StatAttrSize = 8
  83. )
  84. // Transfer types
  85. const (
  86. TransferUpload = iota
  87. TransferDownload
  88. )
  89. // Supported protocols
  90. const (
  91. ProtocolSFTP = "SFTP"
  92. ProtocolSCP = "SCP"
  93. ProtocolSSH = "SSH"
  94. ProtocolFTP = "FTP"
  95. ProtocolWebDAV = "DAV"
  96. ProtocolHTTP = "HTTP"
  97. ProtocolHTTPShare = "HTTPShare"
  98. ProtocolDataRetention = "DataRetention"
  99. ProtocolOIDC = "OIDC"
  100. protocolEventAction = "EventAction"
  101. )
  102. // Upload modes
  103. const (
  104. UploadModeStandard = iota
  105. UploadModeAtomic
  106. UploadModeAtomicWithResume
  107. )
  108. func init() {
  109. Connections.clients = clientsMap{
  110. clients: make(map[string]int),
  111. }
  112. Connections.perUserConns = make(map[string]int)
  113. Connections.mapping = make(map[string]int)
  114. Connections.sshMapping = make(map[string]int)
  115. }
  116. // errors definitions
  117. var (
  118. ErrPermissionDenied = errors.New("permission denied")
  119. ErrNotExist = errors.New("no such file or directory")
  120. ErrOpUnsupported = errors.New("operation unsupported")
  121. ErrGenericFailure = errors.New("failure")
  122. ErrQuotaExceeded = errors.New("denying write due to space limit")
  123. ErrReadQuotaExceeded = errors.New("denying read due to quota limit")
  124. ErrConnectionDenied = errors.New("you are not allowed to connect")
  125. ErrNoBinding = errors.New("no binding configured")
  126. ErrCrtRevoked = errors.New("your certificate has been revoked")
  127. ErrNoCredentials = errors.New("no credential provided")
  128. ErrInternalFailure = errors.New("internal failure")
  129. ErrTransferAborted = errors.New("transfer aborted")
  130. ErrShuttingDown = errors.New("the service is shutting down")
  131. errNoTransfer = errors.New("requested transfer not found")
  132. errTransferMismatch = errors.New("transfer mismatch")
  133. )
  134. var (
  135. // Config is the configuration for the supported protocols
  136. Config Configuration
  137. // Connections is the list of active connections
  138. Connections ActiveConnections
  139. // QuotaScans is the list of active quota scans
  140. QuotaScans ActiveScans
  141. // ActiveMetadataChecks holds the active metadata checks
  142. ActiveMetadataChecks MetadataChecks
  143. transfersChecker TransfersChecker
  144. supportedProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP, ProtocolWebDAV,
  145. ProtocolHTTP, ProtocolHTTPShare, ProtocolOIDC}
  146. disconnHookProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP}
  147. // the map key is the protocol, for each protocol we can have multiple rate limiters
  148. rateLimiters map[string][]*rateLimiter
  149. isShuttingDown atomic.Bool
  150. ftpLoginCommands = []string{"PASS", "USER"}
  151. )
  152. // Initialize sets the common configuration
  153. func Initialize(c Configuration, isShared int) error {
  154. isShuttingDown.Store(false)
  155. Config = c
  156. Config.Actions.ExecuteOn = util.RemoveDuplicates(Config.Actions.ExecuteOn, true)
  157. Config.Actions.ExecuteSync = util.RemoveDuplicates(Config.Actions.ExecuteSync, true)
  158. Config.ProxyAllowed = util.RemoveDuplicates(Config.ProxyAllowed, true)
  159. Config.idleLoginTimeout = 2 * time.Minute
  160. Config.idleTimeoutAsDuration = time.Duration(Config.IdleTimeout) * time.Minute
  161. startPeriodicChecks(periodicTimeoutCheckInterval, isShared)
  162. Config.defender = nil
  163. Config.allowList = nil
  164. Config.rateLimitersList = nil
  165. rateLimiters = make(map[string][]*rateLimiter)
  166. for _, rlCfg := range c.RateLimitersConfig {
  167. if rlCfg.isEnabled() {
  168. if err := rlCfg.validate(); err != nil {
  169. return fmt.Errorf("rate limiters initialization error: %w", err)
  170. }
  171. rateLimiter := rlCfg.getLimiter()
  172. for _, protocol := range rlCfg.Protocols {
  173. rateLimiters[protocol] = append(rateLimiters[protocol], rateLimiter)
  174. }
  175. }
  176. }
  177. if len(rateLimiters) > 0 {
  178. rateLimitersList, err := dataprovider.NewIPList(dataprovider.IPListTypeRateLimiterSafeList)
  179. if err != nil {
  180. return fmt.Errorf("unable to initialize ratelimiters list: %w", err)
  181. }
  182. Config.rateLimitersList = rateLimitersList
  183. }
  184. if c.DefenderConfig.Enabled {
  185. if !util.Contains(supportedDefenderDrivers, c.DefenderConfig.Driver) {
  186. return fmt.Errorf("unsupported defender driver %q", c.DefenderConfig.Driver)
  187. }
  188. var defender Defender
  189. var err error
  190. switch c.DefenderConfig.Driver {
  191. case DefenderDriverProvider:
  192. defender, err = newDBDefender(&c.DefenderConfig)
  193. default:
  194. defender, err = newInMemoryDefender(&c.DefenderConfig)
  195. }
  196. if err != nil {
  197. return fmt.Errorf("defender initialization error: %v", err)
  198. }
  199. logger.Info(logSender, "", "defender initialized with config %+v", c.DefenderConfig)
  200. Config.defender = defender
  201. }
  202. if c.AllowListStatus > 0 {
  203. allowList, err := dataprovider.NewIPList(dataprovider.IPListTypeAllowList)
  204. if err != nil {
  205. return fmt.Errorf("unable to initialize the allow list: %w", err)
  206. }
  207. logger.Info(logSender, "", "allow list initialized")
  208. Config.allowList = allowList
  209. }
  210. if err := c.initializeProxyProtocol(); err != nil {
  211. return err
  212. }
  213. vfs.SetTempPath(c.TempPath)
  214. dataprovider.SetTempPath(c.TempPath)
  215. vfs.SetAllowSelfConnections(c.AllowSelfConnections)
  216. vfs.SetRenameMode(c.RenameMode)
  217. dataprovider.SetAllowSelfConnections(c.AllowSelfConnections)
  218. transfersChecker = getTransfersChecker(isShared)
  219. return nil
  220. }
  221. // CheckClosing returns an error if the service is closing
  222. func CheckClosing() error {
  223. if isShuttingDown.Load() {
  224. return ErrShuttingDown
  225. }
  226. return nil
  227. }
  228. // WaitForTransfers waits, for the specified grace time, for currently ongoing
  229. // client-initiated transfer sessions to completes.
  230. // A zero graceTime means no wait
  231. func WaitForTransfers(graceTime int) {
  232. if graceTime == 0 {
  233. return
  234. }
  235. if isShuttingDown.Swap(true) {
  236. return
  237. }
  238. if activeHooks.Load() == 0 && getActiveConnections() == 0 {
  239. return
  240. }
  241. graceTimer := time.NewTimer(time.Duration(graceTime) * time.Second)
  242. ticker := time.NewTicker(3 * time.Second)
  243. for {
  244. select {
  245. case <-ticker.C:
  246. hooks := activeHooks.Load()
  247. logger.Info(logSender, "", "active hooks: %d", hooks)
  248. if hooks == 0 && getActiveConnections() == 0 {
  249. logger.Info(logSender, "", "no more active connections, graceful shutdown")
  250. ticker.Stop()
  251. graceTimer.Stop()
  252. return
  253. }
  254. case <-graceTimer.C:
  255. logger.Info(logSender, "", "grace time expired, hard shutdown")
  256. ticker.Stop()
  257. return
  258. }
  259. }
  260. }
  261. // getActiveConnections returns the number of connections with active transfers
  262. func getActiveConnections() int {
  263. var activeConns int
  264. Connections.RLock()
  265. for _, c := range Connections.connections {
  266. if len(c.GetTransfers()) > 0 {
  267. activeConns++
  268. }
  269. }
  270. Connections.RUnlock()
  271. logger.Info(logSender, "", "number of connections with active transfers: %d", activeConns)
  272. return activeConns
  273. }
  274. // LimitRate blocks until all the configured rate limiters
  275. // allow one event to happen.
  276. // It returns an error if the time to wait exceeds the max
  277. // allowed delay
  278. func LimitRate(protocol, ip string) (time.Duration, error) {
  279. if Config.rateLimitersList != nil {
  280. isListed, _, err := Config.rateLimitersList.IsListed(ip, protocol)
  281. if err == nil && isListed {
  282. return 0, nil
  283. }
  284. }
  285. for _, limiter := range rateLimiters[protocol] {
  286. if delay, err := limiter.Wait(ip, protocol); err != nil {
  287. logger.Debug(logSender, "", "protocol %s ip %s: %v", protocol, ip, err)
  288. return delay, err
  289. }
  290. }
  291. return 0, nil
  292. }
  293. // Reload reloads the whitelist, the IP filter plugin and the defender's block and safe lists
  294. func Reload() error {
  295. plugin.Handler.ReloadFilter()
  296. return nil
  297. }
  298. // IsBanned returns true if the specified IP address is banned
  299. func IsBanned(ip, protocol string) bool {
  300. if plugin.Handler.IsIPBanned(ip, protocol) {
  301. return true
  302. }
  303. if Config.defender == nil {
  304. return false
  305. }
  306. return Config.defender.IsBanned(ip, protocol)
  307. }
  308. // GetDefenderBanTime returns the ban time for the given IP
  309. // or nil if the IP is not banned or the defender is disabled
  310. func GetDefenderBanTime(ip string) (*time.Time, error) {
  311. if Config.defender == nil {
  312. return nil, nil
  313. }
  314. return Config.defender.GetBanTime(ip)
  315. }
  316. // GetDefenderHosts returns hosts that are banned or for which some violations have been detected
  317. func GetDefenderHosts() ([]dataprovider.DefenderEntry, error) {
  318. if Config.defender == nil {
  319. return nil, nil
  320. }
  321. return Config.defender.GetHosts()
  322. }
  323. // GetDefenderHost returns a defender host by ip, if any
  324. func GetDefenderHost(ip string) (dataprovider.DefenderEntry, error) {
  325. if Config.defender == nil {
  326. return dataprovider.DefenderEntry{}, errors.New("defender is disabled")
  327. }
  328. return Config.defender.GetHost(ip)
  329. }
  330. // DeleteDefenderHost removes the specified IP address from the defender lists
  331. func DeleteDefenderHost(ip string) bool {
  332. if Config.defender == nil {
  333. return false
  334. }
  335. return Config.defender.DeleteHost(ip)
  336. }
  337. // GetDefenderScore returns the score for the given IP
  338. func GetDefenderScore(ip string) (int, error) {
  339. if Config.defender == nil {
  340. return 0, nil
  341. }
  342. return Config.defender.GetScore(ip)
  343. }
  344. // AddDefenderEvent adds the specified defender event for the given IP
  345. func AddDefenderEvent(ip, protocol string, event HostEvent) {
  346. if Config.defender == nil {
  347. return
  348. }
  349. Config.defender.AddEvent(ip, protocol, event)
  350. }
  351. func startPeriodicChecks(duration time.Duration, isShared int) {
  352. startEventScheduler()
  353. spec := fmt.Sprintf("@every %s", duration)
  354. _, err := eventScheduler.AddFunc(spec, Connections.checkTransfers)
  355. util.PanicOnError(err)
  356. logger.Info(logSender, "", "scheduled overquota transfers check, schedule %q", spec)
  357. if isShared == 1 {
  358. logger.Info(logSender, "", "add reload configs task")
  359. _, err := eventScheduler.AddFunc("@every 10m", smtp.ReloadProviderConf)
  360. util.PanicOnError(err)
  361. }
  362. if Config.IdleTimeout > 0 {
  363. ratio := idleTimeoutCheckInterval / periodicTimeoutCheckInterval
  364. spec = fmt.Sprintf("@every %s", duration*ratio)
  365. _, err = eventScheduler.AddFunc(spec, Connections.checkIdles)
  366. util.PanicOnError(err)
  367. logger.Info(logSender, "", "scheduled idle connections check, schedule %q", spec)
  368. }
  369. }
  370. // ActiveTransfer defines the interface for the current active transfers
  371. type ActiveTransfer interface {
  372. GetID() int64
  373. GetType() int
  374. GetSize() int64
  375. GetDownloadedSize() int64
  376. GetUploadedSize() int64
  377. GetVirtualPath() string
  378. GetStartTime() time.Time
  379. SignalClose(err error)
  380. Truncate(fsPath string, size int64) (int64, error)
  381. GetRealFsPath(fsPath string) string
  382. SetTimes(fsPath string, atime time.Time, mtime time.Time) bool
  383. GetTruncatedSize() int64
  384. HasSizeLimit() bool
  385. }
  386. // ActiveConnection defines the interface for the current active connections
  387. type ActiveConnection interface {
  388. GetID() string
  389. GetUsername() string
  390. GetRole() string
  391. GetMaxSessions() int
  392. GetLocalAddress() string
  393. GetRemoteAddress() string
  394. GetClientVersion() string
  395. GetProtocol() string
  396. GetConnectionTime() time.Time
  397. GetLastActivity() time.Time
  398. GetCommand() string
  399. Disconnect() error
  400. AddTransfer(t ActiveTransfer)
  401. RemoveTransfer(t ActiveTransfer)
  402. GetTransfers() []ConnectionTransfer
  403. SignalTransferClose(transferID int64, err error)
  404. CloseFS() error
  405. }
  406. // StatAttributes defines the attributes for set stat commands
  407. type StatAttributes struct {
  408. Mode os.FileMode
  409. Atime time.Time
  410. Mtime time.Time
  411. UID int
  412. GID int
  413. Flags int
  414. Size int64
  415. }
  416. // ConnectionTransfer defines the trasfer details
  417. type ConnectionTransfer struct {
  418. ID int64 `json:"-"`
  419. OperationType string `json:"operation_type"`
  420. StartTime int64 `json:"start_time"`
  421. Size int64 `json:"size"`
  422. VirtualPath string `json:"path"`
  423. HasSizeLimit bool `json:"-"`
  424. ULSize int64 `json:"-"`
  425. DLSize int64 `json:"-"`
  426. }
  427. func (t *ConnectionTransfer) getConnectionTransferAsString() string {
  428. result := ""
  429. switch t.OperationType {
  430. case operationUpload:
  431. result += "UL "
  432. case operationDownload:
  433. result += "DL "
  434. }
  435. result += fmt.Sprintf("%q ", t.VirtualPath)
  436. if t.Size > 0 {
  437. elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(t.StartTime))
  438. speed := float64(t.Size) / float64(util.GetTimeAsMsSinceEpoch(time.Now())-t.StartTime)
  439. result += fmt.Sprintf("Size: %s Elapsed: %s Speed: \"%.1f KB/s\"", util.ByteCountIEC(t.Size),
  440. util.GetDurationAsString(elapsed), speed)
  441. }
  442. return result
  443. }
  444. // Configuration defines configuration parameters common to all supported protocols
  445. type Configuration struct {
  446. // Maximum idle timeout as minutes. If a client is idle for a time that exceeds this setting it will be disconnected.
  447. // 0 means disabled
  448. IdleTimeout int `json:"idle_timeout" mapstructure:"idle_timeout"`
  449. // UploadMode 0 means standard, the files are uploaded directly to the requested path.
  450. // 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path
  451. // when the client ends the upload. Atomic mode avoid problems such as a web server that
  452. // serves partial files when the files are being uploaded.
  453. // In atomic mode if there is an upload error the temporary file is deleted and so the requested
  454. // upload path will not contain a partial file.
  455. // 2 means atomic with resume support: as atomic but if there is an upload error the temporary
  456. // file is renamed to the requested path and not deleted, this way a client can reconnect and resume
  457. // the upload.
  458. UploadMode int `json:"upload_mode" mapstructure:"upload_mode"`
  459. // Actions to execute for SFTP file operations and SSH commands
  460. Actions ProtocolActions `json:"actions" mapstructure:"actions"`
  461. // SetstatMode 0 means "normal mode": requests for changing permissions and owner/group are executed.
  462. // 1 means "ignore mode": requests for changing permissions and owner/group are silently ignored.
  463. // 2 means "ignore mode for cloud fs": requests for changing permissions and owner/group are
  464. // silently ignored for cloud based filesystem such as S3, GCS, Azure Blob. Requests for changing
  465. // modification times are ignored for cloud based filesystem if they are not supported.
  466. SetstatMode int `json:"setstat_mode" mapstructure:"setstat_mode"`
  467. // RenameMode defines how to handle directory renames. By default, renaming of non-empty directories
  468. // is not allowed for cloud storage providers (S3, GCS, Azure Blob). Set to 1 to enable recursive
  469. // renames for these providers, they may be slow, there is no atomic rename API like for local
  470. // filesystem, so SFTPGo will recursively list the directory contents and do a rename for each entry
  471. RenameMode int `json:"rename_mode" mapstructure:"rename_mode"`
  472. // TempPath defines the path for temporary files such as those used for atomic uploads or file pipes.
  473. // If you set this option you must make sure that the defined path exists, is accessible for writing
  474. // by the user running SFTPGo, and is on the same filesystem as the users home directories otherwise
  475. // the renaming for atomic uploads will become a copy and therefore may take a long time.
  476. // The temporary files are not namespaced. The default is generally fine. Leave empty for the default.
  477. TempPath string `json:"temp_path" mapstructure:"temp_path"`
  478. // Support for HAProxy PROXY protocol.
  479. // If you are running SFTPGo behind a proxy server such as HAProxy, AWS ELB or NGNIX, you can enable
  480. // the proxy protocol. It provides a convenient way to safely transport connection information
  481. // such as a client's address across multiple layers of NAT or TCP proxies to get the real
  482. // client IP address instead of the proxy IP. Both protocol versions 1 and 2 are supported.
  483. // - 0 means disabled
  484. // - 1 means proxy protocol enabled. Proxy header will be used and requests without proxy header will be accepted.
  485. // - 2 means proxy protocol required. Proxy header will be used and requests without proxy header will be rejected.
  486. // If the proxy protocol is enabled in SFTPGo then you have to enable the protocol in your proxy configuration too,
  487. // for example for HAProxy add "send-proxy" or "send-proxy-v2" to each server configuration line.
  488. ProxyProtocol int `json:"proxy_protocol" mapstructure:"proxy_protocol"`
  489. // List of IP addresses and IP ranges allowed to send the proxy header.
  490. // If proxy protocol is set to 1 and we receive a proxy header from an IP that is not in the list then the
  491. // connection will be accepted and the header will be ignored.
  492. // If proxy protocol is set to 2 and we receive a proxy header from an IP that is not in the list then the
  493. // connection will be rejected.
  494. ProxyAllowed []string `json:"proxy_allowed" mapstructure:"proxy_allowed"`
  495. // List of IP addresses and IP ranges for which not to read the proxy header
  496. ProxySkipped []string `json:"proxy_skipped" mapstructure:"proxy_skipped"`
  497. // Absolute path to an external program or an HTTP URL to invoke as soon as SFTPGo starts.
  498. // If you define an HTTP URL it will be invoked using a `GET` request.
  499. // Please note that SFTPGo services may not yet be available when this hook is run.
  500. // Leave empty do disable.
  501. StartupHook string `json:"startup_hook" mapstructure:"startup_hook"`
  502. // Absolute path to an external program or an HTTP URL to invoke after a user connects
  503. // and before he tries to login. It allows you to reject the connection based on the source
  504. // ip address. Leave empty do disable.
  505. PostConnectHook string `json:"post_connect_hook" mapstructure:"post_connect_hook"`
  506. // Absolute path to an external program or an HTTP URL to invoke after an SSH/FTP connection ends.
  507. // Leave empty do disable.
  508. PostDisconnectHook string `json:"post_disconnect_hook" mapstructure:"post_disconnect_hook"`
  509. // Absolute path to an external program or an HTTP URL to invoke after a data retention check completes.
  510. // Leave empty do disable.
  511. DataRetentionHook string `json:"data_retention_hook" mapstructure:"data_retention_hook"`
  512. // Maximum number of concurrent client connections. 0 means unlimited
  513. MaxTotalConnections int `json:"max_total_connections" mapstructure:"max_total_connections"`
  514. // Maximum number of concurrent client connections from the same host (IP). 0 means unlimited
  515. MaxPerHostConnections int `json:"max_per_host_connections" mapstructure:"max_per_host_connections"`
  516. // Defines the status of the global allow list. 0 means disabled, 1 enabled.
  517. // If enabled, only the listed IPs/networks can access the configured services, all other
  518. // client connections will be dropped before they even try to authenticate.
  519. // Ensure to enable this setting only after adding some allowed ip/networks from the WebAdmin/REST API
  520. AllowListStatus int `json:"allowlist_status" mapstructure:"allowlist_status"`
  521. // Allow users on this instance to use other users/virtual folders on this instance as storage backend.
  522. // Enable this setting if you know what you are doing.
  523. AllowSelfConnections int `json:"allow_self_connections" mapstructure:"allow_self_connections"`
  524. // Defender configuration
  525. DefenderConfig DefenderConfig `json:"defender" mapstructure:"defender"`
  526. // Rate limiter configurations
  527. RateLimitersConfig []RateLimiterConfig `json:"rate_limiters" mapstructure:"rate_limiters"`
  528. idleTimeoutAsDuration time.Duration
  529. idleLoginTimeout time.Duration
  530. defender Defender
  531. allowList *dataprovider.IPList
  532. rateLimitersList *dataprovider.IPList
  533. proxyAllowed []func(net.IP) bool
  534. proxySkipped []func(net.IP) bool
  535. }
  536. // IsAtomicUploadEnabled returns true if atomic upload is enabled
  537. func (c *Configuration) IsAtomicUploadEnabled() bool {
  538. return c.UploadMode == UploadModeAtomic || c.UploadMode == UploadModeAtomicWithResume
  539. }
  540. func (c *Configuration) initializeProxyProtocol() error {
  541. if c.ProxyProtocol > 0 {
  542. allowed, err := util.ParseAllowedIPAndRanges(c.ProxyAllowed)
  543. if err != nil {
  544. return fmt.Errorf("invalid proxy allowed: %w", err)
  545. }
  546. skipped, err := util.ParseAllowedIPAndRanges(c.ProxySkipped)
  547. if err != nil {
  548. return fmt.Errorf("invalid proxy skipped: %w", err)
  549. }
  550. Config.proxyAllowed = allowed
  551. Config.proxySkipped = skipped
  552. }
  553. return nil
  554. }
  555. // GetProxyListener returns a wrapper for the given listener that supports the
  556. // HAProxy Proxy Protocol
  557. func (c *Configuration) GetProxyListener(listener net.Listener) (*proxyproto.Listener, error) {
  558. if c.ProxyProtocol > 0 {
  559. defaultPolicy := proxyproto.REQUIRE
  560. if c.ProxyProtocol == 1 {
  561. defaultPolicy = proxyproto.IGNORE
  562. }
  563. return &proxyproto.Listener{
  564. Listener: listener,
  565. Policy: getProxyPolicy(c.proxyAllowed, c.proxySkipped, defaultPolicy),
  566. ReadHeaderTimeout: 10 * time.Second,
  567. }, nil
  568. }
  569. return nil, errors.New("proxy protocol not configured")
  570. }
  571. // GetRateLimitersStatus returns the rate limiters status
  572. func (c *Configuration) GetRateLimitersStatus() (bool, []string) {
  573. enabled := false
  574. var protocols []string
  575. for _, rlCfg := range c.RateLimitersConfig {
  576. if rlCfg.isEnabled() {
  577. enabled = true
  578. protocols = append(protocols, rlCfg.Protocols...)
  579. }
  580. }
  581. return enabled, util.RemoveDuplicates(protocols, false)
  582. }
  583. // IsAllowListEnabled returns true if the global allow list is enabled
  584. func (c *Configuration) IsAllowListEnabled() bool {
  585. return c.AllowListStatus > 0
  586. }
  587. // ExecuteStartupHook runs the startup hook if defined
  588. func (c *Configuration) ExecuteStartupHook() error {
  589. if c.StartupHook == "" {
  590. return nil
  591. }
  592. if strings.HasPrefix(c.StartupHook, "http") {
  593. var url *url.URL
  594. url, err := url.Parse(c.StartupHook)
  595. if err != nil {
  596. logger.Warn(logSender, "", "Invalid startup hook %q: %v", c.StartupHook, err)
  597. return err
  598. }
  599. startTime := time.Now()
  600. resp, err := httpclient.RetryableGet(url.String())
  601. if err != nil {
  602. logger.Warn(logSender, "", "Error executing startup hook: %v", err)
  603. return err
  604. }
  605. defer resp.Body.Close()
  606. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, response code: %v", time.Since(startTime), resp.StatusCode)
  607. return nil
  608. }
  609. if !filepath.IsAbs(c.StartupHook) {
  610. err := fmt.Errorf("invalid startup hook %q", c.StartupHook)
  611. logger.Warn(logSender, "", "Invalid startup hook %q", c.StartupHook)
  612. return err
  613. }
  614. startTime := time.Now()
  615. timeout, env, args := command.GetConfig(c.StartupHook, command.HookStartup)
  616. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  617. defer cancel()
  618. cmd := exec.CommandContext(ctx, c.StartupHook, args...)
  619. cmd.Env = env
  620. err := cmd.Run()
  621. logger.Debug(logSender, "", "Startup hook executed, elapsed: %s, error: %v", time.Since(startTime), err)
  622. return nil
  623. }
  624. func (c *Configuration) executePostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
  625. startNewHook()
  626. defer hookEnded()
  627. ipAddr := util.GetIPFromRemoteAddress(remoteAddr)
  628. connDuration := int64(time.Since(connectionTime) / time.Millisecond)
  629. if strings.HasPrefix(c.PostDisconnectHook, "http") {
  630. var url *url.URL
  631. url, err := url.Parse(c.PostDisconnectHook)
  632. if err != nil {
  633. logger.Warn(protocol, connID, "Invalid post disconnect hook %q: %v", c.PostDisconnectHook, err)
  634. return
  635. }
  636. q := url.Query()
  637. q.Add("ip", ipAddr)
  638. q.Add("protocol", protocol)
  639. q.Add("username", username)
  640. q.Add("connection_duration", strconv.FormatInt(connDuration, 10))
  641. url.RawQuery = q.Encode()
  642. startTime := time.Now()
  643. resp, err := httpclient.RetryableGet(url.String())
  644. respCode := 0
  645. if err == nil {
  646. respCode = resp.StatusCode
  647. resp.Body.Close()
  648. }
  649. logger.Debug(protocol, connID, "Post disconnect hook response code: %v, elapsed: %v, err: %v",
  650. respCode, time.Since(startTime), err)
  651. return
  652. }
  653. if !filepath.IsAbs(c.PostDisconnectHook) {
  654. logger.Debug(protocol, connID, "invalid post disconnect hook %q", c.PostDisconnectHook)
  655. return
  656. }
  657. timeout, env, args := command.GetConfig(c.PostDisconnectHook, command.HookPostDisconnect)
  658. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  659. defer cancel()
  660. startTime := time.Now()
  661. cmd := exec.CommandContext(ctx, c.PostDisconnectHook, args...)
  662. cmd.Env = append(env,
  663. fmt.Sprintf("SFTPGO_CONNECTION_IP=%s", ipAddr),
  664. fmt.Sprintf("SFTPGO_CONNECTION_USERNAME=%s", username),
  665. fmt.Sprintf("SFTPGO_CONNECTION_DURATION=%d", connDuration),
  666. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%s", protocol))
  667. err := cmd.Run()
  668. logger.Debug(protocol, connID, "Post disconnect hook executed, elapsed: %s error: %v", time.Since(startTime), err)
  669. }
  670. func (c *Configuration) checkPostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
  671. if c.PostDisconnectHook == "" {
  672. return
  673. }
  674. if !util.Contains(disconnHookProtocols, protocol) {
  675. return
  676. }
  677. go c.executePostDisconnectHook(remoteAddr, protocol, username, connID, connectionTime)
  678. }
  679. // ExecutePostConnectHook executes the post connect hook if defined
  680. func (c *Configuration) ExecutePostConnectHook(ipAddr, protocol string) error {
  681. if c.PostConnectHook == "" {
  682. return nil
  683. }
  684. if strings.HasPrefix(c.PostConnectHook, "http") {
  685. var url *url.URL
  686. url, err := url.Parse(c.PostConnectHook)
  687. if err != nil {
  688. logger.Warn(protocol, "", "Login from ip %q denied, invalid post connect hook %q: %v",
  689. ipAddr, c.PostConnectHook, err)
  690. return getPermissionDeniedError(protocol)
  691. }
  692. q := url.Query()
  693. q.Add("ip", ipAddr)
  694. q.Add("protocol", protocol)
  695. url.RawQuery = q.Encode()
  696. resp, err := httpclient.RetryableGet(url.String())
  697. if err != nil {
  698. logger.Warn(protocol, "", "Login from ip %q denied, error executing post connect hook: %v", ipAddr, err)
  699. return getPermissionDeniedError(protocol)
  700. }
  701. defer resp.Body.Close()
  702. if resp.StatusCode != http.StatusOK {
  703. logger.Warn(protocol, "", "Login from ip %q denied, post connect hook response code: %v", ipAddr, resp.StatusCode)
  704. return getPermissionDeniedError(protocol)
  705. }
  706. return nil
  707. }
  708. if !filepath.IsAbs(c.PostConnectHook) {
  709. err := fmt.Errorf("invalid post connect hook %q", c.PostConnectHook)
  710. logger.Warn(protocol, "", "Login from ip %q denied: %v", ipAddr, err)
  711. return getPermissionDeniedError(protocol)
  712. }
  713. timeout, env, args := command.GetConfig(c.PostConnectHook, command.HookPostConnect)
  714. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  715. defer cancel()
  716. cmd := exec.CommandContext(ctx, c.PostConnectHook, args...)
  717. cmd.Env = append(env,
  718. fmt.Sprintf("SFTPGO_CONNECTION_IP=%s", ipAddr),
  719. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%s", protocol))
  720. err := cmd.Run()
  721. if err != nil {
  722. logger.Warn(protocol, "", "Login from ip %q denied, connect hook error: %v", ipAddr, err)
  723. return getPermissionDeniedError(protocol)
  724. }
  725. return nil
  726. }
  727. func getProxyPolicy(allowed, skipped []func(net.IP) bool, def proxyproto.Policy) proxyproto.PolicyFunc {
  728. return func(upstream net.Addr) (proxyproto.Policy, error) {
  729. upstreamIP, err := util.GetIPFromNetAddr(upstream)
  730. if err != nil {
  731. // something is wrong with the source IP, better reject the connection
  732. return proxyproto.REJECT, err
  733. }
  734. for _, skippedFrom := range skipped {
  735. if skippedFrom(upstreamIP) {
  736. return proxyproto.SKIP, nil
  737. }
  738. }
  739. for _, allowFrom := range allowed {
  740. if allowFrom(upstreamIP) {
  741. return proxyproto.USE, nil
  742. }
  743. }
  744. return def, nil
  745. }
  746. }
  747. // SSHConnection defines an ssh connection.
  748. // Each SSH connection can open several channels for SFTP or SSH commands
  749. type SSHConnection struct {
  750. id string
  751. conn net.Conn
  752. lastActivity atomic.Int64
  753. }
  754. // NewSSHConnection returns a new SSHConnection
  755. func NewSSHConnection(id string, conn net.Conn) *SSHConnection {
  756. c := &SSHConnection{
  757. id: id,
  758. conn: conn,
  759. }
  760. c.lastActivity.Store(time.Now().UnixNano())
  761. return c
  762. }
  763. // GetID returns the ID for this SSHConnection
  764. func (c *SSHConnection) GetID() string {
  765. return c.id
  766. }
  767. // UpdateLastActivity updates last activity for this connection
  768. func (c *SSHConnection) UpdateLastActivity() {
  769. c.lastActivity.Store(time.Now().UnixNano())
  770. }
  771. // GetLastActivity returns the last connection activity
  772. func (c *SSHConnection) GetLastActivity() time.Time {
  773. return time.Unix(0, c.lastActivity.Load())
  774. }
  775. // Close closes the underlying network connection
  776. func (c *SSHConnection) Close() error {
  777. return c.conn.Close()
  778. }
  779. // ActiveConnections holds the currect active connections with the associated transfers
  780. type ActiveConnections struct {
  781. // clients contains both authenticated and estabilished connections and the ones waiting
  782. // for authentication
  783. clients clientsMap
  784. transfersCheckStatus atomic.Bool
  785. sync.RWMutex
  786. connections []ActiveConnection
  787. mapping map[string]int
  788. sshConnections []*SSHConnection
  789. sshMapping map[string]int
  790. perUserConns map[string]int
  791. }
  792. // internal method, must be called within a locked block
  793. func (conns *ActiveConnections) addUserConnection(username string) {
  794. if username == "" {
  795. return
  796. }
  797. conns.perUserConns[username]++
  798. }
  799. // internal method, must be called within a locked block
  800. func (conns *ActiveConnections) removeUserConnection(username string) {
  801. if username == "" {
  802. return
  803. }
  804. if val, ok := conns.perUserConns[username]; ok {
  805. conns.perUserConns[username]--
  806. if val > 1 {
  807. return
  808. }
  809. delete(conns.perUserConns, username)
  810. }
  811. }
  812. // GetActiveSessions returns the number of active sessions for the given username.
  813. // We return the open sessions for any protocol
  814. func (conns *ActiveConnections) GetActiveSessions(username string) int {
  815. conns.RLock()
  816. defer conns.RUnlock()
  817. return conns.perUserConns[username]
  818. }
  819. // Add adds a new connection to the active ones
  820. func (conns *ActiveConnections) Add(c ActiveConnection) error {
  821. conns.Lock()
  822. defer conns.Unlock()
  823. if username := c.GetUsername(); username != "" {
  824. if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
  825. if val := conns.perUserConns[username]; val >= maxSessions {
  826. return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
  827. }
  828. }
  829. conns.addUserConnection(username)
  830. }
  831. conns.mapping[c.GetID()] = len(conns.connections)
  832. conns.connections = append(conns.connections, c)
  833. metric.UpdateActiveConnectionsSize(len(conns.connections))
  834. logger.Debug(c.GetProtocol(), c.GetID(), "connection added, local address %q, remote address %q, num open connections: %d",
  835. c.GetLocalAddress(), c.GetRemoteAddress(), len(conns.connections))
  836. return nil
  837. }
  838. // Swap replaces an existing connection with the given one.
  839. // This method is useful if you have to change some connection details
  840. // for example for FTP is used to update the connection once the user
  841. // authenticates
  842. func (conns *ActiveConnections) Swap(c ActiveConnection) error {
  843. conns.Lock()
  844. defer conns.Unlock()
  845. if idx, ok := conns.mapping[c.GetID()]; ok {
  846. conn := conns.connections[idx]
  847. conns.removeUserConnection(conn.GetUsername())
  848. if username := c.GetUsername(); username != "" {
  849. if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
  850. if val, ok := conns.perUserConns[username]; ok && val >= maxSessions {
  851. conns.addUserConnection(conn.GetUsername())
  852. return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
  853. }
  854. }
  855. conns.addUserConnection(username)
  856. }
  857. err := conn.CloseFS()
  858. conns.connections[idx] = c
  859. logger.Debug(logSender, c.GetID(), "connection swapped, close fs error: %v", err)
  860. conn = nil
  861. return nil
  862. }
  863. return errors.New("connection to swap not found")
  864. }
  865. // Remove removes a connection from the active ones
  866. func (conns *ActiveConnections) Remove(connectionID string) {
  867. conns.Lock()
  868. defer conns.Unlock()
  869. if idx, ok := conns.mapping[connectionID]; ok {
  870. conn := conns.connections[idx]
  871. err := conn.CloseFS()
  872. lastIdx := len(conns.connections) - 1
  873. conns.connections[idx] = conns.connections[lastIdx]
  874. conns.connections[lastIdx] = nil
  875. conns.connections = conns.connections[:lastIdx]
  876. delete(conns.mapping, connectionID)
  877. if idx != lastIdx {
  878. conns.mapping[conns.connections[idx].GetID()] = idx
  879. }
  880. conns.removeUserConnection(conn.GetUsername())
  881. metric.UpdateActiveConnectionsSize(lastIdx)
  882. logger.Debug(conn.GetProtocol(), conn.GetID(), "connection removed, local address %q, remote address %q close fs error: %v, num open connections: %d",
  883. conn.GetLocalAddress(), conn.GetRemoteAddress(), err, lastIdx)
  884. if conn.GetProtocol() == ProtocolFTP && conn.GetUsername() == "" && !util.Contains(ftpLoginCommands, conn.GetCommand()) {
  885. ip := util.GetIPFromRemoteAddress(conn.GetRemoteAddress())
  886. logger.ConnectionFailedLog("", ip, dataprovider.LoginMethodNoAuthTryed, conn.GetProtocol(),
  887. dataprovider.ErrNoAuthTryed.Error())
  888. metric.AddNoAuthTryed()
  889. AddDefenderEvent(ip, ProtocolFTP, HostEventNoLoginTried)
  890. dataprovider.ExecutePostLoginHook(&dataprovider.User{}, dataprovider.LoginMethodNoAuthTryed, ip,
  891. conn.GetProtocol(), dataprovider.ErrNoAuthTryed)
  892. }
  893. Config.checkPostDisconnectHook(conn.GetRemoteAddress(), conn.GetProtocol(), conn.GetUsername(),
  894. conn.GetID(), conn.GetConnectionTime())
  895. return
  896. }
  897. logger.Debug(logSender, "", "connection id %q to remove not found!", connectionID)
  898. }
  899. // Close closes an active connection.
  900. // It returns true on success
  901. func (conns *ActiveConnections) Close(connectionID, role string) bool {
  902. conns.RLock()
  903. var result bool
  904. if idx, ok := conns.mapping[connectionID]; ok {
  905. c := conns.connections[idx]
  906. if role == "" || c.GetRole() == role {
  907. defer func(conn ActiveConnection) {
  908. err := conn.Disconnect()
  909. logger.Debug(conn.GetProtocol(), conn.GetID(), "close connection requested, close err: %v", err)
  910. }(c)
  911. result = true
  912. }
  913. }
  914. conns.RUnlock()
  915. return result
  916. }
  917. // AddSSHConnection adds a new ssh connection to the active ones
  918. func (conns *ActiveConnections) AddSSHConnection(c *SSHConnection) {
  919. conns.Lock()
  920. defer conns.Unlock()
  921. conns.sshMapping[c.GetID()] = len(conns.sshConnections)
  922. conns.sshConnections = append(conns.sshConnections, c)
  923. logger.Debug(logSender, c.GetID(), "ssh connection added, num open connections: %d", len(conns.sshConnections))
  924. }
  925. // RemoveSSHConnection removes a connection from the active ones
  926. func (conns *ActiveConnections) RemoveSSHConnection(connectionID string) {
  927. conns.Lock()
  928. defer conns.Unlock()
  929. if idx, ok := conns.sshMapping[connectionID]; ok {
  930. lastIdx := len(conns.sshConnections) - 1
  931. conns.sshConnections[idx] = conns.sshConnections[lastIdx]
  932. conns.sshConnections[lastIdx] = nil
  933. conns.sshConnections = conns.sshConnections[:lastIdx]
  934. delete(conns.sshMapping, connectionID)
  935. if idx != lastIdx {
  936. conns.sshMapping[conns.sshConnections[idx].GetID()] = idx
  937. }
  938. logger.Debug(logSender, connectionID, "ssh connection removed, num open ssh connections: %d", lastIdx)
  939. return
  940. }
  941. logger.Warn(logSender, "", "ssh connection to remove with id %q not found!", connectionID)
  942. }
  943. func (conns *ActiveConnections) checkIdles() {
  944. conns.RLock()
  945. for _, sshConn := range conns.sshConnections {
  946. idleTime := time.Since(sshConn.GetLastActivity())
  947. if idleTime > Config.idleTimeoutAsDuration {
  948. // we close an SSH connection if it has no active connections associated
  949. idToMatch := fmt.Sprintf("_%s_", sshConn.GetID())
  950. toClose := true
  951. for _, conn := range conns.connections {
  952. if strings.Contains(conn.GetID(), idToMatch) {
  953. if time.Since(conn.GetLastActivity()) <= Config.idleTimeoutAsDuration {
  954. toClose = false
  955. break
  956. }
  957. }
  958. }
  959. if toClose {
  960. defer func(c *SSHConnection) {
  961. err := c.Close()
  962. logger.Debug(logSender, c.GetID(), "close idle SSH connection, idle time: %v, close err: %v",
  963. time.Since(c.GetLastActivity()), err)
  964. }(sshConn)
  965. }
  966. }
  967. }
  968. for _, c := range conns.connections {
  969. idleTime := time.Since(c.GetLastActivity())
  970. isUnauthenticatedFTPUser := (c.GetProtocol() == ProtocolFTP && c.GetUsername() == "")
  971. if idleTime > Config.idleTimeoutAsDuration || (isUnauthenticatedFTPUser && idleTime > Config.idleLoginTimeout) {
  972. defer func(conn ActiveConnection) {
  973. err := conn.Disconnect()
  974. logger.Debug(conn.GetProtocol(), conn.GetID(), "close idle connection, idle time: %v, username: %q close err: %v",
  975. time.Since(conn.GetLastActivity()), conn.GetUsername(), err)
  976. }(c)
  977. }
  978. }
  979. conns.RUnlock()
  980. }
  981. func (conns *ActiveConnections) checkTransfers() {
  982. if conns.transfersCheckStatus.Load() {
  983. logger.Warn(logSender, "", "the previous transfer check is still running, skipping execution")
  984. return
  985. }
  986. conns.transfersCheckStatus.Store(true)
  987. defer conns.transfersCheckStatus.Store(false)
  988. conns.RLock()
  989. if len(conns.connections) < 2 {
  990. conns.RUnlock()
  991. return
  992. }
  993. var wg sync.WaitGroup
  994. logger.Debug(logSender, "", "start concurrent transfers check")
  995. // update the current size for transfers to monitors
  996. for _, c := range conns.connections {
  997. for _, t := range c.GetTransfers() {
  998. if t.HasSizeLimit {
  999. wg.Add(1)
  1000. go func(transfer ConnectionTransfer, connID string) {
  1001. defer wg.Done()
  1002. transfersChecker.UpdateTransferCurrentSizes(transfer.ULSize, transfer.DLSize, transfer.ID, connID)
  1003. }(t, c.GetID())
  1004. }
  1005. }
  1006. }
  1007. conns.RUnlock()
  1008. logger.Debug(logSender, "", "waiting for the update of the transfers current size")
  1009. wg.Wait()
  1010. logger.Debug(logSender, "", "getting overquota transfers")
  1011. overquotaTransfers := transfersChecker.GetOverquotaTransfers()
  1012. logger.Debug(logSender, "", "number of overquota transfers: %v", len(overquotaTransfers))
  1013. if len(overquotaTransfers) == 0 {
  1014. return
  1015. }
  1016. conns.RLock()
  1017. defer conns.RUnlock()
  1018. for _, c := range conns.connections {
  1019. for _, overquotaTransfer := range overquotaTransfers {
  1020. if c.GetID() == overquotaTransfer.ConnID {
  1021. logger.Info(logSender, c.GetID(), "user %q is overquota, try to close transfer id %v",
  1022. c.GetUsername(), overquotaTransfer.TransferID)
  1023. var err error
  1024. if overquotaTransfer.TransferType == TransferDownload {
  1025. err = getReadQuotaExceededError(c.GetProtocol())
  1026. } else {
  1027. err = getQuotaExceededError(c.GetProtocol())
  1028. }
  1029. c.SignalTransferClose(overquotaTransfer.TransferID, err)
  1030. }
  1031. }
  1032. }
  1033. logger.Debug(logSender, "", "transfers check completed")
  1034. }
  1035. // AddClientConnection stores a new client connection
  1036. func (conns *ActiveConnections) AddClientConnection(ipAddr string) {
  1037. conns.clients.add(ipAddr)
  1038. }
  1039. // RemoveClientConnection removes a disconnected client from the tracked ones
  1040. func (conns *ActiveConnections) RemoveClientConnection(ipAddr string) {
  1041. conns.clients.remove(ipAddr)
  1042. }
  1043. // GetClientConnections returns the total number of client connections
  1044. func (conns *ActiveConnections) GetClientConnections() int32 {
  1045. return conns.clients.getTotal()
  1046. }
  1047. // IsNewConnectionAllowed returns an error if the maximum number of concurrent allowed
  1048. // connections is exceeded or a whitelist is defined and the specified ipAddr is not listed
  1049. // or the service is shutting down
  1050. func (conns *ActiveConnections) IsNewConnectionAllowed(ipAddr, protocol string) error {
  1051. if isShuttingDown.Load() {
  1052. return ErrShuttingDown
  1053. }
  1054. if Config.allowList != nil {
  1055. isListed, _, err := Config.allowList.IsListed(ipAddr, protocol)
  1056. if err != nil {
  1057. logger.Error(logSender, "", "unable to query allow list, connection denied, ip %q, protocol %s, err: %v",
  1058. ipAddr, protocol, err)
  1059. return ErrConnectionDenied
  1060. }
  1061. if !isListed {
  1062. return ErrConnectionDenied
  1063. }
  1064. }
  1065. if Config.MaxTotalConnections == 0 && Config.MaxPerHostConnections == 0 {
  1066. return nil
  1067. }
  1068. if Config.MaxPerHostConnections > 0 {
  1069. if total := conns.clients.getTotalFrom(ipAddr); total > Config.MaxPerHostConnections {
  1070. logger.Info(logSender, "", "active connections from %s %d/%d", ipAddr, total, Config.MaxPerHostConnections)
  1071. AddDefenderEvent(ipAddr, protocol, HostEventLimitExceeded)
  1072. return ErrConnectionDenied
  1073. }
  1074. }
  1075. if Config.MaxTotalConnections > 0 {
  1076. if total := conns.clients.getTotal(); total > int32(Config.MaxTotalConnections) {
  1077. logger.Info(logSender, "", "active client connections %d/%d", total, Config.MaxTotalConnections)
  1078. return ErrConnectionDenied
  1079. }
  1080. // on a single SFTP connection we could have multiple SFTP channels or commands
  1081. // so we check the estabilished connections too
  1082. conns.RLock()
  1083. defer conns.RUnlock()
  1084. if sess := len(conns.connections); sess >= Config.MaxTotalConnections {
  1085. logger.Info(logSender, "", "active client sessions %d/%d", sess, Config.MaxTotalConnections)
  1086. return ErrConnectionDenied
  1087. }
  1088. }
  1089. return nil
  1090. }
  1091. // GetStats returns stats for active connections
  1092. func (conns *ActiveConnections) GetStats(role string) []ConnectionStatus {
  1093. conns.RLock()
  1094. defer conns.RUnlock()
  1095. stats := make([]ConnectionStatus, 0, len(conns.connections))
  1096. node := dataprovider.GetNodeName()
  1097. for _, c := range conns.connections {
  1098. if role == "" || c.GetRole() == role {
  1099. stat := ConnectionStatus{
  1100. Username: c.GetUsername(),
  1101. ConnectionID: c.GetID(),
  1102. ClientVersion: c.GetClientVersion(),
  1103. RemoteAddress: c.GetRemoteAddress(),
  1104. ConnectionTime: util.GetTimeAsMsSinceEpoch(c.GetConnectionTime()),
  1105. LastActivity: util.GetTimeAsMsSinceEpoch(c.GetLastActivity()),
  1106. Protocol: c.GetProtocol(),
  1107. Command: c.GetCommand(),
  1108. Transfers: c.GetTransfers(),
  1109. Node: node,
  1110. }
  1111. stats = append(stats, stat)
  1112. }
  1113. }
  1114. return stats
  1115. }
  1116. // ConnectionStatus returns the status for an active connection
  1117. type ConnectionStatus struct {
  1118. // Logged in username
  1119. Username string `json:"username"`
  1120. // Unique identifier for the connection
  1121. ConnectionID string `json:"connection_id"`
  1122. // client's version string
  1123. ClientVersion string `json:"client_version,omitempty"`
  1124. // Remote address for this connection
  1125. RemoteAddress string `json:"remote_address"`
  1126. // Connection time as unix timestamp in milliseconds
  1127. ConnectionTime int64 `json:"connection_time"`
  1128. // Last activity as unix timestamp in milliseconds
  1129. LastActivity int64 `json:"last_activity"`
  1130. // Protocol for this connection
  1131. Protocol string `json:"protocol"`
  1132. // active uploads/downloads
  1133. Transfers []ConnectionTransfer `json:"active_transfers,omitempty"`
  1134. // SSH command or WebDAV method
  1135. Command string `json:"command,omitempty"`
  1136. // Node identifier, omitted for single node installations
  1137. Node string `json:"node,omitempty"`
  1138. }
  1139. // GetConnectionDuration returns the connection duration as string
  1140. func (c *ConnectionStatus) GetConnectionDuration() string {
  1141. elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(c.ConnectionTime))
  1142. return util.GetDurationAsString(elapsed)
  1143. }
  1144. // GetConnectionInfo returns connection info.
  1145. // Protocol,Client Version and RemoteAddress are returned.
  1146. func (c *ConnectionStatus) GetConnectionInfo() string {
  1147. var result strings.Builder
  1148. result.WriteString(fmt.Sprintf("%v. Client: %q From: %q", c.Protocol, c.ClientVersion, c.RemoteAddress))
  1149. if c.Command == "" {
  1150. return result.String()
  1151. }
  1152. switch c.Protocol {
  1153. case ProtocolSSH, ProtocolFTP:
  1154. result.WriteString(fmt.Sprintf(". Command: %q", c.Command))
  1155. case ProtocolWebDAV:
  1156. result.WriteString(fmt.Sprintf(". Method: %q", c.Command))
  1157. }
  1158. return result.String()
  1159. }
  1160. // GetTransfersAsString returns the active transfers as string
  1161. func (c *ConnectionStatus) GetTransfersAsString() string {
  1162. result := ""
  1163. for _, t := range c.Transfers {
  1164. if result != "" {
  1165. result += ". "
  1166. }
  1167. result += t.getConnectionTransferAsString()
  1168. }
  1169. return result
  1170. }
  1171. // ActiveQuotaScan defines an active quota scan for a user
  1172. type ActiveQuotaScan struct {
  1173. // Username to which the quota scan refers
  1174. Username string `json:"username"`
  1175. // quota scan start time as unix timestamp in milliseconds
  1176. StartTime int64 `json:"start_time"`
  1177. Role string `json:"-"`
  1178. }
  1179. // ActiveVirtualFolderQuotaScan defines an active quota scan for a virtual folder
  1180. type ActiveVirtualFolderQuotaScan struct {
  1181. // folder name to which the quota scan refers
  1182. Name string `json:"name"`
  1183. // quota scan start time as unix timestamp in milliseconds
  1184. StartTime int64 `json:"start_time"`
  1185. }
  1186. // ActiveScans holds the active quota scans
  1187. type ActiveScans struct {
  1188. sync.RWMutex
  1189. UserScans []ActiveQuotaScan
  1190. FolderScans []ActiveVirtualFolderQuotaScan
  1191. }
  1192. // GetUsersQuotaScans returns the active users quota scans
  1193. func (s *ActiveScans) GetUsersQuotaScans(role string) []ActiveQuotaScan {
  1194. s.RLock()
  1195. defer s.RUnlock()
  1196. scans := make([]ActiveQuotaScan, 0, len(s.UserScans))
  1197. for _, scan := range s.UserScans {
  1198. if role == "" || role == scan.Role {
  1199. scans = append(scans, ActiveQuotaScan{
  1200. Username: scan.Username,
  1201. StartTime: scan.StartTime,
  1202. })
  1203. }
  1204. }
  1205. return scans
  1206. }
  1207. // AddUserQuotaScan adds a user to the ones with active quota scans.
  1208. // Returns false if the user has a quota scan already running
  1209. func (s *ActiveScans) AddUserQuotaScan(username, role string) bool {
  1210. s.Lock()
  1211. defer s.Unlock()
  1212. for _, scan := range s.UserScans {
  1213. if scan.Username == username {
  1214. return false
  1215. }
  1216. }
  1217. s.UserScans = append(s.UserScans, ActiveQuotaScan{
  1218. Username: username,
  1219. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1220. Role: role,
  1221. })
  1222. return true
  1223. }
  1224. // RemoveUserQuotaScan removes a user from the ones with active quota scans.
  1225. // Returns false if the user has no active quota scans
  1226. func (s *ActiveScans) RemoveUserQuotaScan(username string) bool {
  1227. s.Lock()
  1228. defer s.Unlock()
  1229. for idx, scan := range s.UserScans {
  1230. if scan.Username == username {
  1231. lastIdx := len(s.UserScans) - 1
  1232. s.UserScans[idx] = s.UserScans[lastIdx]
  1233. s.UserScans = s.UserScans[:lastIdx]
  1234. return true
  1235. }
  1236. }
  1237. return false
  1238. }
  1239. // GetVFoldersQuotaScans returns the active quota scans for virtual folders
  1240. func (s *ActiveScans) GetVFoldersQuotaScans() []ActiveVirtualFolderQuotaScan {
  1241. s.RLock()
  1242. defer s.RUnlock()
  1243. scans := make([]ActiveVirtualFolderQuotaScan, len(s.FolderScans))
  1244. copy(scans, s.FolderScans)
  1245. return scans
  1246. }
  1247. // AddVFolderQuotaScan adds a virtual folder to the ones with active quota scans.
  1248. // Returns false if the folder has a quota scan already running
  1249. func (s *ActiveScans) AddVFolderQuotaScan(folderName string) bool {
  1250. s.Lock()
  1251. defer s.Unlock()
  1252. for _, scan := range s.FolderScans {
  1253. if scan.Name == folderName {
  1254. return false
  1255. }
  1256. }
  1257. s.FolderScans = append(s.FolderScans, ActiveVirtualFolderQuotaScan{
  1258. Name: folderName,
  1259. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1260. })
  1261. return true
  1262. }
  1263. // RemoveVFolderQuotaScan removes a folder from the ones with active quota scans.
  1264. // Returns false if the folder has no active quota scans
  1265. func (s *ActiveScans) RemoveVFolderQuotaScan(folderName string) bool {
  1266. s.Lock()
  1267. defer s.Unlock()
  1268. for idx, scan := range s.FolderScans {
  1269. if scan.Name == folderName {
  1270. lastIdx := len(s.FolderScans) - 1
  1271. s.FolderScans[idx] = s.FolderScans[lastIdx]
  1272. s.FolderScans = s.FolderScans[:lastIdx]
  1273. return true
  1274. }
  1275. }
  1276. return false
  1277. }
  1278. // MetadataCheck defines an active metadata check
  1279. type MetadataCheck struct {
  1280. // Username to which the metadata check refers
  1281. Username string `json:"username"`
  1282. // check start time as unix timestamp in milliseconds
  1283. StartTime int64 `json:"start_time"`
  1284. Role string `json:"-"`
  1285. }
  1286. // MetadataChecks holds the active metadata checks
  1287. type MetadataChecks struct {
  1288. sync.RWMutex
  1289. checks []MetadataCheck
  1290. }
  1291. // Get returns the active metadata checks
  1292. func (c *MetadataChecks) Get(role string) []MetadataCheck {
  1293. c.RLock()
  1294. defer c.RUnlock()
  1295. checks := make([]MetadataCheck, 0, len(c.checks))
  1296. for _, check := range c.checks {
  1297. if role == "" || role == check.Role {
  1298. checks = append(checks, MetadataCheck{
  1299. Username: check.Username,
  1300. StartTime: check.StartTime,
  1301. })
  1302. }
  1303. }
  1304. return checks
  1305. }
  1306. // Add adds a user to the ones with active metadata checks.
  1307. // Return false if a metadata check is already active for the specified user
  1308. func (c *MetadataChecks) Add(username, role string) bool {
  1309. c.Lock()
  1310. defer c.Unlock()
  1311. for idx := range c.checks {
  1312. if c.checks[idx].Username == username {
  1313. return false
  1314. }
  1315. }
  1316. c.checks = append(c.checks, MetadataCheck{
  1317. Username: username,
  1318. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1319. Role: role,
  1320. })
  1321. return true
  1322. }
  1323. // Remove removes a user from the ones with active metadata checks
  1324. func (c *MetadataChecks) Remove(username string) bool {
  1325. c.Lock()
  1326. defer c.Unlock()
  1327. for idx := range c.checks {
  1328. if c.checks[idx].Username == username {
  1329. lastIdx := len(c.checks) - 1
  1330. c.checks[idx] = c.checks[lastIdx]
  1331. c.checks = c.checks[:lastIdx]
  1332. return true
  1333. }
  1334. }
  1335. return false
  1336. }