common.go 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package common defines code shared among file transfer packages and protocols
  15. package common
  16. import (
  17. "context"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "os/exec"
  25. "path/filepath"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/pires/go-proxyproto"
  32. "github.com/drakkan/sftpgo/v2/internal/command"
  33. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  34. "github.com/drakkan/sftpgo/v2/internal/httpclient"
  35. "github.com/drakkan/sftpgo/v2/internal/logger"
  36. "github.com/drakkan/sftpgo/v2/internal/metric"
  37. "github.com/drakkan/sftpgo/v2/internal/plugin"
  38. "github.com/drakkan/sftpgo/v2/internal/util"
  39. "github.com/drakkan/sftpgo/v2/internal/vfs"
  40. )
  41. // constants
  42. const (
  43. logSender = "common"
  44. uploadLogSender = "Upload"
  45. downloadLogSender = "Download"
  46. renameLogSender = "Rename"
  47. rmdirLogSender = "Rmdir"
  48. mkdirLogSender = "Mkdir"
  49. symlinkLogSender = "Symlink"
  50. removeLogSender = "Remove"
  51. chownLogSender = "Chown"
  52. chmodLogSender = "Chmod"
  53. chtimesLogSender = "Chtimes"
  54. copyLogSender = "Copy"
  55. truncateLogSender = "Truncate"
  56. operationDownload = "download"
  57. operationUpload = "upload"
  58. operationFirstDownload = "first-download"
  59. operationFirstUpload = "first-upload"
  60. operationDelete = "delete"
  61. operationCopy = "copy"
  62. // Pre-download action name
  63. OperationPreDownload = "pre-download"
  64. // Pre-upload action name
  65. OperationPreUpload = "pre-upload"
  66. operationPreDelete = "pre-delete"
  67. operationRename = "rename"
  68. operationMkdir = "mkdir"
  69. operationRmdir = "rmdir"
  70. // SSH command action name
  71. OperationSSHCmd = "ssh_cmd"
  72. chtimesFormat = "2006-01-02T15:04:05" // YYYY-MM-DDTHH:MM:SS
  73. idleTimeoutCheckInterval = 3 * time.Minute
  74. periodicTimeoutCheckInterval = 1 * time.Minute
  75. )
  76. // Stat flags
  77. const (
  78. StatAttrUIDGID = 1
  79. StatAttrPerms = 2
  80. StatAttrTimes = 4
  81. StatAttrSize = 8
  82. )
  83. // Transfer types
  84. const (
  85. TransferUpload = iota
  86. TransferDownload
  87. )
  88. // Supported protocols
  89. const (
  90. ProtocolSFTP = "SFTP"
  91. ProtocolSCP = "SCP"
  92. ProtocolSSH = "SSH"
  93. ProtocolFTP = "FTP"
  94. ProtocolWebDAV = "DAV"
  95. ProtocolHTTP = "HTTP"
  96. ProtocolHTTPShare = "HTTPShare"
  97. ProtocolDataRetention = "DataRetention"
  98. ProtocolOIDC = "OIDC"
  99. protocolEventAction = "EventAction"
  100. )
  101. // Upload modes
  102. const (
  103. UploadModeStandard = iota
  104. UploadModeAtomic
  105. UploadModeAtomicWithResume
  106. )
  107. func init() {
  108. Connections.clients = clientsMap{
  109. clients: make(map[string]int),
  110. }
  111. Connections.perUserConns = make(map[string]int)
  112. Connections.mapping = make(map[string]int)
  113. Connections.sshMapping = make(map[string]int)
  114. }
  115. // errors definitions
  116. var (
  117. ErrPermissionDenied = errors.New("permission denied")
  118. ErrNotExist = errors.New("no such file or directory")
  119. ErrOpUnsupported = errors.New("operation unsupported")
  120. ErrGenericFailure = errors.New("failure")
  121. ErrQuotaExceeded = errors.New("denying write due to space limit")
  122. ErrReadQuotaExceeded = errors.New("denying read due to quota limit")
  123. ErrConnectionDenied = errors.New("you are not allowed to connect")
  124. ErrNoBinding = errors.New("no binding configured")
  125. ErrCrtRevoked = errors.New("your certificate has been revoked")
  126. ErrNoCredentials = errors.New("no credential provided")
  127. ErrInternalFailure = errors.New("internal failure")
  128. ErrTransferAborted = errors.New("transfer aborted")
  129. ErrShuttingDown = errors.New("the service is shutting down")
  130. errNoTransfer = errors.New("requested transfer not found")
  131. errTransferMismatch = errors.New("transfer mismatch")
  132. )
  133. var (
  134. // Config is the configuration for the supported protocols
  135. Config Configuration
  136. // Connections is the list of active connections
  137. Connections ActiveConnections
  138. // QuotaScans is the list of active quota scans
  139. QuotaScans ActiveScans
  140. // ActiveMetadataChecks holds the active metadata checks
  141. ActiveMetadataChecks MetadataChecks
  142. transfersChecker TransfersChecker
  143. supportedProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP, ProtocolWebDAV,
  144. ProtocolHTTP, ProtocolHTTPShare, ProtocolOIDC}
  145. disconnHookProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP}
  146. // the map key is the protocol, for each protocol we can have multiple rate limiters
  147. rateLimiters map[string][]*rateLimiter
  148. isShuttingDown atomic.Bool
  149. ftpLoginCommands = []string{"PASS", "USER"}
  150. )
  151. // Initialize sets the common configuration
  152. func Initialize(c Configuration, isShared int) error {
  153. isShuttingDown.Store(false)
  154. Config = c
  155. Config.Actions.ExecuteOn = util.RemoveDuplicates(Config.Actions.ExecuteOn, true)
  156. Config.Actions.ExecuteSync = util.RemoveDuplicates(Config.Actions.ExecuteSync, true)
  157. Config.ProxyAllowed = util.RemoveDuplicates(Config.ProxyAllowed, true)
  158. Config.idleLoginTimeout = 2 * time.Minute
  159. Config.idleTimeoutAsDuration = time.Duration(Config.IdleTimeout) * time.Minute
  160. startPeriodicChecks(periodicTimeoutCheckInterval)
  161. Config.defender = nil
  162. Config.allowList = nil
  163. Config.rateLimitersList = nil
  164. rateLimiters = make(map[string][]*rateLimiter)
  165. for _, rlCfg := range c.RateLimitersConfig {
  166. if rlCfg.isEnabled() {
  167. if err := rlCfg.validate(); err != nil {
  168. return fmt.Errorf("rate limiters initialization error: %w", err)
  169. }
  170. rateLimiter := rlCfg.getLimiter()
  171. for _, protocol := range rlCfg.Protocols {
  172. rateLimiters[protocol] = append(rateLimiters[protocol], rateLimiter)
  173. }
  174. }
  175. }
  176. if len(rateLimiters) > 0 {
  177. rateLimitersList, err := dataprovider.NewIPList(dataprovider.IPListTypeRateLimiterSafeList)
  178. if err != nil {
  179. return fmt.Errorf("unable to initialize ratelimiters list: %w", err)
  180. }
  181. Config.rateLimitersList = rateLimitersList
  182. }
  183. if c.DefenderConfig.Enabled {
  184. if !util.Contains(supportedDefenderDrivers, c.DefenderConfig.Driver) {
  185. return fmt.Errorf("unsupported defender driver %q", c.DefenderConfig.Driver)
  186. }
  187. var defender Defender
  188. var err error
  189. switch c.DefenderConfig.Driver {
  190. case DefenderDriverProvider:
  191. defender, err = newDBDefender(&c.DefenderConfig)
  192. default:
  193. defender, err = newInMemoryDefender(&c.DefenderConfig)
  194. }
  195. if err != nil {
  196. return fmt.Errorf("defender initialization error: %v", err)
  197. }
  198. logger.Info(logSender, "", "defender initialized with config %+v", c.DefenderConfig)
  199. Config.defender = defender
  200. }
  201. if c.AllowListStatus > 0 {
  202. allowList, err := dataprovider.NewIPList(dataprovider.IPListTypeAllowList)
  203. if err != nil {
  204. return fmt.Errorf("unable to initialize the allow list: %w", err)
  205. }
  206. logger.Info(logSender, "", "allow list initialized")
  207. Config.allowList = allowList
  208. }
  209. vfs.SetTempPath(c.TempPath)
  210. dataprovider.SetTempPath(c.TempPath)
  211. vfs.SetAllowSelfConnections(c.AllowSelfConnections)
  212. vfs.SetRenameMode(c.RenameMode)
  213. dataprovider.SetAllowSelfConnections(c.AllowSelfConnections)
  214. transfersChecker = getTransfersChecker(isShared)
  215. return nil
  216. }
  217. // CheckClosing returns an error if the service is closing
  218. func CheckClosing() error {
  219. if isShuttingDown.Load() {
  220. return ErrShuttingDown
  221. }
  222. return nil
  223. }
  224. // WaitForTransfers waits, for the specified grace time, for currently ongoing
  225. // client-initiated transfer sessions to completes.
  226. // A zero graceTime means no wait
  227. func WaitForTransfers(graceTime int) {
  228. if graceTime == 0 {
  229. return
  230. }
  231. if isShuttingDown.Swap(true) {
  232. return
  233. }
  234. if activeHooks.Load() == 0 && getActiveConnections() == 0 {
  235. return
  236. }
  237. graceTimer := time.NewTimer(time.Duration(graceTime) * time.Second)
  238. ticker := time.NewTicker(3 * time.Second)
  239. for {
  240. select {
  241. case <-ticker.C:
  242. hooks := activeHooks.Load()
  243. logger.Info(logSender, "", "active hooks: %d", hooks)
  244. if hooks == 0 && getActiveConnections() == 0 {
  245. logger.Info(logSender, "", "no more active connections, graceful shutdown")
  246. ticker.Stop()
  247. graceTimer.Stop()
  248. return
  249. }
  250. case <-graceTimer.C:
  251. logger.Info(logSender, "", "grace time expired, hard shutdown")
  252. ticker.Stop()
  253. return
  254. }
  255. }
  256. }
  257. // getActiveConnections returns the number of connections with active transfers
  258. func getActiveConnections() int {
  259. var activeConns int
  260. Connections.RLock()
  261. for _, c := range Connections.connections {
  262. if len(c.GetTransfers()) > 0 {
  263. activeConns++
  264. }
  265. }
  266. Connections.RUnlock()
  267. logger.Info(logSender, "", "number of connections with active transfers: %d", activeConns)
  268. return activeConns
  269. }
  270. // LimitRate blocks until all the configured rate limiters
  271. // allow one event to happen.
  272. // It returns an error if the time to wait exceeds the max
  273. // allowed delay
  274. func LimitRate(protocol, ip string) (time.Duration, error) {
  275. if Config.rateLimitersList != nil {
  276. isListed, _, err := Config.rateLimitersList.IsListed(ip, protocol)
  277. if err == nil && isListed {
  278. return 0, nil
  279. }
  280. }
  281. for _, limiter := range rateLimiters[protocol] {
  282. if delay, err := limiter.Wait(ip, protocol); err != nil {
  283. logger.Debug(logSender, "", "protocol %s ip %s: %v", protocol, ip, err)
  284. return delay, err
  285. }
  286. }
  287. return 0, nil
  288. }
  289. // Reload reloads the whitelist, the IP filter plugin and the defender's block and safe lists
  290. func Reload() error {
  291. plugin.Handler.ReloadFilter()
  292. return nil
  293. }
  294. // IsBanned returns true if the specified IP address is banned
  295. func IsBanned(ip, protocol string) bool {
  296. if plugin.Handler.IsIPBanned(ip) {
  297. return true
  298. }
  299. if Config.defender == nil {
  300. return false
  301. }
  302. return Config.defender.IsBanned(ip, protocol)
  303. }
  304. // GetDefenderBanTime returns the ban time for the given IP
  305. // or nil if the IP is not banned or the defender is disabled
  306. func GetDefenderBanTime(ip string) (*time.Time, error) {
  307. if Config.defender == nil {
  308. return nil, nil
  309. }
  310. return Config.defender.GetBanTime(ip)
  311. }
  312. // GetDefenderHosts returns hosts that are banned or for which some violations have been detected
  313. func GetDefenderHosts() ([]dataprovider.DefenderEntry, error) {
  314. if Config.defender == nil {
  315. return nil, nil
  316. }
  317. return Config.defender.GetHosts()
  318. }
  319. // GetDefenderHost returns a defender host by ip, if any
  320. func GetDefenderHost(ip string) (dataprovider.DefenderEntry, error) {
  321. if Config.defender == nil {
  322. return dataprovider.DefenderEntry{}, errors.New("defender is disabled")
  323. }
  324. return Config.defender.GetHost(ip)
  325. }
  326. // DeleteDefenderHost removes the specified IP address from the defender lists
  327. func DeleteDefenderHost(ip string) bool {
  328. if Config.defender == nil {
  329. return false
  330. }
  331. return Config.defender.DeleteHost(ip)
  332. }
  333. // GetDefenderScore returns the score for the given IP
  334. func GetDefenderScore(ip string) (int, error) {
  335. if Config.defender == nil {
  336. return 0, nil
  337. }
  338. return Config.defender.GetScore(ip)
  339. }
  340. // AddDefenderEvent adds the specified defender event for the given IP
  341. func AddDefenderEvent(ip, protocol string, event HostEvent) {
  342. if Config.defender == nil {
  343. return
  344. }
  345. Config.defender.AddEvent(ip, protocol, event)
  346. }
  347. func startPeriodicChecks(duration time.Duration) {
  348. startEventScheduler()
  349. spec := fmt.Sprintf("@every %s", duration)
  350. _, err := eventScheduler.AddFunc(spec, Connections.checkTransfers)
  351. util.PanicOnError(err)
  352. logger.Info(logSender, "", "scheduled overquota transfers check, schedule %q", spec)
  353. if Config.IdleTimeout > 0 {
  354. ratio := idleTimeoutCheckInterval / periodicTimeoutCheckInterval
  355. spec = fmt.Sprintf("@every %s", duration*ratio)
  356. _, err = eventScheduler.AddFunc(spec, Connections.checkIdles)
  357. util.PanicOnError(err)
  358. logger.Info(logSender, "", "scheduled idle connections check, schedule %q", spec)
  359. }
  360. }
  361. // ActiveTransfer defines the interface for the current active transfers
  362. type ActiveTransfer interface {
  363. GetID() int64
  364. GetType() int
  365. GetSize() int64
  366. GetDownloadedSize() int64
  367. GetUploadedSize() int64
  368. GetVirtualPath() string
  369. GetStartTime() time.Time
  370. SignalClose(err error)
  371. Truncate(fsPath string, size int64) (int64, error)
  372. GetRealFsPath(fsPath string) string
  373. SetTimes(fsPath string, atime time.Time, mtime time.Time) bool
  374. GetTruncatedSize() int64
  375. HasSizeLimit() bool
  376. }
  377. // ActiveConnection defines the interface for the current active connections
  378. type ActiveConnection interface {
  379. GetID() string
  380. GetUsername() string
  381. GetRole() string
  382. GetMaxSessions() int
  383. GetLocalAddress() string
  384. GetRemoteAddress() string
  385. GetClientVersion() string
  386. GetProtocol() string
  387. GetConnectionTime() time.Time
  388. GetLastActivity() time.Time
  389. GetCommand() string
  390. Disconnect() error
  391. AddTransfer(t ActiveTransfer)
  392. RemoveTransfer(t ActiveTransfer)
  393. GetTransfers() []ConnectionTransfer
  394. SignalTransferClose(transferID int64, err error)
  395. CloseFS() error
  396. }
  397. // StatAttributes defines the attributes for set stat commands
  398. type StatAttributes struct {
  399. Mode os.FileMode
  400. Atime time.Time
  401. Mtime time.Time
  402. UID int
  403. GID int
  404. Flags int
  405. Size int64
  406. }
  407. // ConnectionTransfer defines the trasfer details
  408. type ConnectionTransfer struct {
  409. ID int64 `json:"-"`
  410. OperationType string `json:"operation_type"`
  411. StartTime int64 `json:"start_time"`
  412. Size int64 `json:"size"`
  413. VirtualPath string `json:"path"`
  414. HasSizeLimit bool `json:"-"`
  415. ULSize int64 `json:"-"`
  416. DLSize int64 `json:"-"`
  417. }
  418. func (t *ConnectionTransfer) getConnectionTransferAsString() string {
  419. result := ""
  420. switch t.OperationType {
  421. case operationUpload:
  422. result += "UL "
  423. case operationDownload:
  424. result += "DL "
  425. }
  426. result += fmt.Sprintf("%q ", t.VirtualPath)
  427. if t.Size > 0 {
  428. elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(t.StartTime))
  429. speed := float64(t.Size) / float64(util.GetTimeAsMsSinceEpoch(time.Now())-t.StartTime)
  430. result += fmt.Sprintf("Size: %s Elapsed: %s Speed: \"%.1f KB/s\"", util.ByteCountIEC(t.Size),
  431. util.GetDurationAsString(elapsed), speed)
  432. }
  433. return result
  434. }
  435. // Configuration defines configuration parameters common to all supported protocols
  436. type Configuration struct {
  437. // Maximum idle timeout as minutes. If a client is idle for a time that exceeds this setting it will be disconnected.
  438. // 0 means disabled
  439. IdleTimeout int `json:"idle_timeout" mapstructure:"idle_timeout"`
  440. // UploadMode 0 means standard, the files are uploaded directly to the requested path.
  441. // 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path
  442. // when the client ends the upload. Atomic mode avoid problems such as a web server that
  443. // serves partial files when the files are being uploaded.
  444. // In atomic mode if there is an upload error the temporary file is deleted and so the requested
  445. // upload path will not contain a partial file.
  446. // 2 means atomic with resume support: as atomic but if there is an upload error the temporary
  447. // file is renamed to the requested path and not deleted, this way a client can reconnect and resume
  448. // the upload.
  449. UploadMode int `json:"upload_mode" mapstructure:"upload_mode"`
  450. // Actions to execute for SFTP file operations and SSH commands
  451. Actions ProtocolActions `json:"actions" mapstructure:"actions"`
  452. // SetstatMode 0 means "normal mode": requests for changing permissions and owner/group are executed.
  453. // 1 means "ignore mode": requests for changing permissions and owner/group are silently ignored.
  454. // 2 means "ignore mode for cloud fs": requests for changing permissions and owner/group are
  455. // silently ignored for cloud based filesystem such as S3, GCS, Azure Blob. Requests for changing
  456. // modification times are ignored for cloud based filesystem if they are not supported.
  457. SetstatMode int `json:"setstat_mode" mapstructure:"setstat_mode"`
  458. // RenameMode defines how to handle directory renames. By default, renaming of non-empty directories
  459. // is not allowed for cloud storage providers (S3, GCS, Azure Blob). Set to 1 to enable recursive
  460. // renames for these providers, they may be slow, there is no atomic rename API like for local
  461. // filesystem, so SFTPGo will recursively list the directory contents and do a rename for each entry
  462. RenameMode int `json:"rename_mode" mapstructure:"rename_mode"`
  463. // TempPath defines the path for temporary files such as those used for atomic uploads or file pipes.
  464. // If you set this option you must make sure that the defined path exists, is accessible for writing
  465. // by the user running SFTPGo, and is on the same filesystem as the users home directories otherwise
  466. // the renaming for atomic uploads will become a copy and therefore may take a long time.
  467. // The temporary files are not namespaced. The default is generally fine. Leave empty for the default.
  468. TempPath string `json:"temp_path" mapstructure:"temp_path"`
  469. // Support for HAProxy PROXY protocol.
  470. // If you are running SFTPGo behind a proxy server such as HAProxy, AWS ELB or NGNIX, you can enable
  471. // the proxy protocol. It provides a convenient way to safely transport connection information
  472. // such as a client's address across multiple layers of NAT or TCP proxies to get the real
  473. // client IP address instead of the proxy IP. Both protocol versions 1 and 2 are supported.
  474. // - 0 means disabled
  475. // - 1 means proxy protocol enabled. Proxy header will be used and requests without proxy header will be accepted.
  476. // - 2 means proxy protocol required. Proxy header will be used and requests without proxy header will be rejected.
  477. // If the proxy protocol is enabled in SFTPGo then you have to enable the protocol in your proxy configuration too,
  478. // for example for HAProxy add "send-proxy" or "send-proxy-v2" to each server configuration line.
  479. ProxyProtocol int `json:"proxy_protocol" mapstructure:"proxy_protocol"`
  480. // List of IP addresses and IP ranges allowed to send the proxy header.
  481. // If proxy protocol is set to 1 and we receive a proxy header from an IP that is not in the list then the
  482. // connection will be accepted and the header will be ignored.
  483. // If proxy protocol is set to 2 and we receive a proxy header from an IP that is not in the list then the
  484. // connection will be rejected.
  485. ProxyAllowed []string `json:"proxy_allowed" mapstructure:"proxy_allowed"`
  486. // Absolute path to an external program or an HTTP URL to invoke as soon as SFTPGo starts.
  487. // If you define an HTTP URL it will be invoked using a `GET` request.
  488. // Please note that SFTPGo services may not yet be available when this hook is run.
  489. // Leave empty do disable.
  490. StartupHook string `json:"startup_hook" mapstructure:"startup_hook"`
  491. // Absolute path to an external program or an HTTP URL to invoke after a user connects
  492. // and before he tries to login. It allows you to reject the connection based on the source
  493. // ip address. Leave empty do disable.
  494. PostConnectHook string `json:"post_connect_hook" mapstructure:"post_connect_hook"`
  495. // Absolute path to an external program or an HTTP URL to invoke after an SSH/FTP connection ends.
  496. // Leave empty do disable.
  497. PostDisconnectHook string `json:"post_disconnect_hook" mapstructure:"post_disconnect_hook"`
  498. // Absolute path to an external program or an HTTP URL to invoke after a data retention check completes.
  499. // Leave empty do disable.
  500. DataRetentionHook string `json:"data_retention_hook" mapstructure:"data_retention_hook"`
  501. // Maximum number of concurrent client connections. 0 means unlimited
  502. MaxTotalConnections int `json:"max_total_connections" mapstructure:"max_total_connections"`
  503. // Maximum number of concurrent client connections from the same host (IP). 0 means unlimited
  504. MaxPerHostConnections int `json:"max_per_host_connections" mapstructure:"max_per_host_connections"`
  505. // Defines the status of the global allow list. 0 means disabled, 1 enabled.
  506. // If enabled, only the listed IPs/networks can access the configured services, all other
  507. // client connections will be dropped before they even try to authenticate.
  508. // Ensure to enable this setting only after adding some allowed ip/networks from the WebAdmin/REST API
  509. AllowListStatus int `json:"allowlist_status" mapstructure:"allowlist_status"`
  510. // Allow users on this instance to use other users/virtual folders on this instance as storage backend.
  511. // Enable this setting if you know what you are doing.
  512. AllowSelfConnections int `json:"allow_self_connections" mapstructure:"allow_self_connections"`
  513. // Defender configuration
  514. DefenderConfig DefenderConfig `json:"defender" mapstructure:"defender"`
  515. // Rate limiter configurations
  516. RateLimitersConfig []RateLimiterConfig `json:"rate_limiters" mapstructure:"rate_limiters"`
  517. idleTimeoutAsDuration time.Duration
  518. idleLoginTimeout time.Duration
  519. defender Defender
  520. allowList *dataprovider.IPList
  521. rateLimitersList *dataprovider.IPList
  522. }
  523. // IsAtomicUploadEnabled returns true if atomic upload is enabled
  524. func (c *Configuration) IsAtomicUploadEnabled() bool {
  525. return c.UploadMode == UploadModeAtomic || c.UploadMode == UploadModeAtomicWithResume
  526. }
  527. // GetProxyListener returns a wrapper for the given listener that supports the
  528. // HAProxy Proxy Protocol
  529. func (c *Configuration) GetProxyListener(listener net.Listener) (*proxyproto.Listener, error) {
  530. var err error
  531. if c.ProxyProtocol > 0 {
  532. var policyFunc func(upstream net.Addr) (proxyproto.Policy, error)
  533. if c.ProxyProtocol == 1 && len(c.ProxyAllowed) > 0 {
  534. policyFunc, err = proxyproto.LaxWhiteListPolicy(c.ProxyAllowed)
  535. if err != nil {
  536. return nil, err
  537. }
  538. }
  539. if c.ProxyProtocol == 2 {
  540. if len(c.ProxyAllowed) == 0 {
  541. policyFunc = func(upstream net.Addr) (proxyproto.Policy, error) {
  542. return proxyproto.REQUIRE, nil
  543. }
  544. } else {
  545. policyFunc, err = proxyproto.StrictWhiteListPolicy(c.ProxyAllowed)
  546. if err != nil {
  547. return nil, err
  548. }
  549. }
  550. }
  551. return &proxyproto.Listener{
  552. Listener: listener,
  553. Policy: policyFunc,
  554. ReadHeaderTimeout: 10 * time.Second,
  555. }, nil
  556. }
  557. return nil, errors.New("proxy protocol not configured")
  558. }
  559. // GetRateLimitersStatus returns the rate limiters status
  560. func (c *Configuration) GetRateLimitersStatus() (bool, []string) {
  561. enabled := false
  562. var protocols []string
  563. for _, rlCfg := range c.RateLimitersConfig {
  564. if rlCfg.isEnabled() {
  565. enabled = true
  566. protocols = append(protocols, rlCfg.Protocols...)
  567. }
  568. }
  569. return enabled, util.RemoveDuplicates(protocols, false)
  570. }
  571. // IsAllowListEnabled returns true if the global allow list is enabled
  572. func (c *Configuration) IsAllowListEnabled() bool {
  573. return c.AllowListStatus > 0
  574. }
  575. // ExecuteStartupHook runs the startup hook if defined
  576. func (c *Configuration) ExecuteStartupHook() error {
  577. if c.StartupHook == "" {
  578. return nil
  579. }
  580. if strings.HasPrefix(c.StartupHook, "http") {
  581. var url *url.URL
  582. url, err := url.Parse(c.StartupHook)
  583. if err != nil {
  584. logger.Warn(logSender, "", "Invalid startup hook %#v: %v", c.StartupHook, err)
  585. return err
  586. }
  587. startTime := time.Now()
  588. resp, err := httpclient.RetryableGet(url.String())
  589. if err != nil {
  590. logger.Warn(logSender, "", "Error executing startup hook: %v", err)
  591. return err
  592. }
  593. defer resp.Body.Close()
  594. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, response code: %v", time.Since(startTime), resp.StatusCode)
  595. return nil
  596. }
  597. if !filepath.IsAbs(c.StartupHook) {
  598. err := fmt.Errorf("invalid startup hook %#v", c.StartupHook)
  599. logger.Warn(logSender, "", "Invalid startup hook %#v", c.StartupHook)
  600. return err
  601. }
  602. startTime := time.Now()
  603. timeout, env, args := command.GetConfig(c.StartupHook, command.HookStartup)
  604. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  605. defer cancel()
  606. cmd := exec.CommandContext(ctx, c.StartupHook, args...)
  607. cmd.Env = env
  608. err := cmd.Run()
  609. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, error: %v", time.Since(startTime), err)
  610. return nil
  611. }
  612. func (c *Configuration) executePostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
  613. startNewHook()
  614. defer hookEnded()
  615. ipAddr := util.GetIPFromRemoteAddress(remoteAddr)
  616. connDuration := int64(time.Since(connectionTime) / time.Millisecond)
  617. if strings.HasPrefix(c.PostDisconnectHook, "http") {
  618. var url *url.URL
  619. url, err := url.Parse(c.PostDisconnectHook)
  620. if err != nil {
  621. logger.Warn(protocol, connID, "Invalid post disconnect hook %#v: %v", c.PostDisconnectHook, err)
  622. return
  623. }
  624. q := url.Query()
  625. q.Add("ip", ipAddr)
  626. q.Add("protocol", protocol)
  627. q.Add("username", username)
  628. q.Add("connection_duration", strconv.FormatInt(connDuration, 10))
  629. url.RawQuery = q.Encode()
  630. startTime := time.Now()
  631. resp, err := httpclient.RetryableGet(url.String())
  632. respCode := 0
  633. if err == nil {
  634. respCode = resp.StatusCode
  635. resp.Body.Close()
  636. }
  637. logger.Debug(protocol, connID, "Post disconnect hook response code: %v, elapsed: %v, err: %v",
  638. respCode, time.Since(startTime), err)
  639. return
  640. }
  641. if !filepath.IsAbs(c.PostDisconnectHook) {
  642. logger.Debug(protocol, connID, "invalid post disconnect hook %#v", c.PostDisconnectHook)
  643. return
  644. }
  645. timeout, env, args := command.GetConfig(c.PostDisconnectHook, command.HookPostDisconnect)
  646. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  647. defer cancel()
  648. startTime := time.Now()
  649. cmd := exec.CommandContext(ctx, c.PostDisconnectHook, args...)
  650. cmd.Env = append(env,
  651. fmt.Sprintf("SFTPGO_CONNECTION_IP=%v", ipAddr),
  652. fmt.Sprintf("SFTPGO_CONNECTION_USERNAME=%v", username),
  653. fmt.Sprintf("SFTPGO_CONNECTION_DURATION=%v", connDuration),
  654. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%v", protocol))
  655. err := cmd.Run()
  656. logger.Debug(protocol, connID, "Post disconnect hook executed, elapsed: %v error: %v", time.Since(startTime), err)
  657. }
  658. func (c *Configuration) checkPostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
  659. if c.PostDisconnectHook == "" {
  660. return
  661. }
  662. if !util.Contains(disconnHookProtocols, protocol) {
  663. return
  664. }
  665. go c.executePostDisconnectHook(remoteAddr, protocol, username, connID, connectionTime)
  666. }
  667. // ExecutePostConnectHook executes the post connect hook if defined
  668. func (c *Configuration) ExecutePostConnectHook(ipAddr, protocol string) error {
  669. if c.PostConnectHook == "" {
  670. return nil
  671. }
  672. if strings.HasPrefix(c.PostConnectHook, "http") {
  673. var url *url.URL
  674. url, err := url.Parse(c.PostConnectHook)
  675. if err != nil {
  676. logger.Warn(protocol, "", "Login from ip %#v denied, invalid post connect hook %#v: %v",
  677. ipAddr, c.PostConnectHook, err)
  678. return err
  679. }
  680. q := url.Query()
  681. q.Add("ip", ipAddr)
  682. q.Add("protocol", protocol)
  683. url.RawQuery = q.Encode()
  684. resp, err := httpclient.RetryableGet(url.String())
  685. if err != nil {
  686. logger.Warn(protocol, "", "Login from ip %#v denied, error executing post connect hook: %v", ipAddr, err)
  687. return err
  688. }
  689. defer resp.Body.Close()
  690. if resp.StatusCode != http.StatusOK {
  691. logger.Warn(protocol, "", "Login from ip %#v denied, post connect hook response code: %v", ipAddr, resp.StatusCode)
  692. return errUnexpectedHTTResponse
  693. }
  694. return nil
  695. }
  696. if !filepath.IsAbs(c.PostConnectHook) {
  697. err := fmt.Errorf("invalid post connect hook %#v", c.PostConnectHook)
  698. logger.Warn(protocol, "", "Login from ip %#v denied: %v", ipAddr, err)
  699. return err
  700. }
  701. timeout, env, args := command.GetConfig(c.PostConnectHook, command.HookPostConnect)
  702. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  703. defer cancel()
  704. cmd := exec.CommandContext(ctx, c.PostConnectHook, args...)
  705. cmd.Env = append(env,
  706. fmt.Sprintf("SFTPGO_CONNECTION_IP=%v", ipAddr),
  707. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%v", protocol))
  708. err := cmd.Run()
  709. if err != nil {
  710. logger.Warn(protocol, "", "Login from ip %#v denied, connect hook error: %v", ipAddr, err)
  711. }
  712. return err
  713. }
  714. // SSHConnection defines an ssh connection.
  715. // Each SSH connection can open several channels for SFTP or SSH commands
  716. type SSHConnection struct {
  717. id string
  718. conn net.Conn
  719. lastActivity atomic.Int64
  720. }
  721. // NewSSHConnection returns a new SSHConnection
  722. func NewSSHConnection(id string, conn net.Conn) *SSHConnection {
  723. c := &SSHConnection{
  724. id: id,
  725. conn: conn,
  726. }
  727. c.lastActivity.Store(time.Now().UnixNano())
  728. return c
  729. }
  730. // GetID returns the ID for this SSHConnection
  731. func (c *SSHConnection) GetID() string {
  732. return c.id
  733. }
  734. // UpdateLastActivity updates last activity for this connection
  735. func (c *SSHConnection) UpdateLastActivity() {
  736. c.lastActivity.Store(time.Now().UnixNano())
  737. }
  738. // GetLastActivity returns the last connection activity
  739. func (c *SSHConnection) GetLastActivity() time.Time {
  740. return time.Unix(0, c.lastActivity.Load())
  741. }
  742. // Close closes the underlying network connection
  743. func (c *SSHConnection) Close() error {
  744. return c.conn.Close()
  745. }
  746. // ActiveConnections holds the currect active connections with the associated transfers
  747. type ActiveConnections struct {
  748. // clients contains both authenticated and estabilished connections and the ones waiting
  749. // for authentication
  750. clients clientsMap
  751. transfersCheckStatus atomic.Bool
  752. sync.RWMutex
  753. connections []ActiveConnection
  754. mapping map[string]int
  755. sshConnections []*SSHConnection
  756. sshMapping map[string]int
  757. perUserConns map[string]int
  758. }
  759. // internal method, must be called within a locked block
  760. func (conns *ActiveConnections) addUserConnection(username string) {
  761. if username == "" {
  762. return
  763. }
  764. conns.perUserConns[username]++
  765. }
  766. // internal method, must be called within a locked block
  767. func (conns *ActiveConnections) removeUserConnection(username string) {
  768. if username == "" {
  769. return
  770. }
  771. if val, ok := conns.perUserConns[username]; ok {
  772. conns.perUserConns[username]--
  773. if val > 1 {
  774. return
  775. }
  776. delete(conns.perUserConns, username)
  777. }
  778. }
  779. // GetActiveSessions returns the number of active sessions for the given username.
  780. // We return the open sessions for any protocol
  781. func (conns *ActiveConnections) GetActiveSessions(username string) int {
  782. conns.RLock()
  783. defer conns.RUnlock()
  784. return conns.perUserConns[username]
  785. }
  786. // Add adds a new connection to the active ones
  787. func (conns *ActiveConnections) Add(c ActiveConnection) error {
  788. conns.Lock()
  789. defer conns.Unlock()
  790. if username := c.GetUsername(); username != "" {
  791. if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
  792. if val := conns.perUserConns[username]; val >= maxSessions {
  793. return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
  794. }
  795. }
  796. conns.addUserConnection(username)
  797. }
  798. conns.mapping[c.GetID()] = len(conns.connections)
  799. conns.connections = append(conns.connections, c)
  800. metric.UpdateActiveConnectionsSize(len(conns.connections))
  801. logger.Debug(c.GetProtocol(), c.GetID(), "connection added, local address %q, remote address %q, num open connections: %d",
  802. c.GetLocalAddress(), c.GetRemoteAddress(), len(conns.connections))
  803. return nil
  804. }
  805. // Swap replaces an existing connection with the given one.
  806. // This method is useful if you have to change some connection details
  807. // for example for FTP is used to update the connection once the user
  808. // authenticates
  809. func (conns *ActiveConnections) Swap(c ActiveConnection) error {
  810. conns.Lock()
  811. defer conns.Unlock()
  812. if idx, ok := conns.mapping[c.GetID()]; ok {
  813. conn := conns.connections[idx]
  814. conns.removeUserConnection(conn.GetUsername())
  815. if username := c.GetUsername(); username != "" {
  816. if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
  817. if val, ok := conns.perUserConns[username]; ok && val >= maxSessions {
  818. conns.addUserConnection(conn.GetUsername())
  819. return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
  820. }
  821. }
  822. conns.addUserConnection(username)
  823. }
  824. err := conn.CloseFS()
  825. conns.connections[idx] = c
  826. logger.Debug(logSender, c.GetID(), "connection swapped, close fs error: %v", err)
  827. conn = nil
  828. return nil
  829. }
  830. return errors.New("connection to swap not found")
  831. }
  832. // Remove removes a connection from the active ones
  833. func (conns *ActiveConnections) Remove(connectionID string) {
  834. conns.Lock()
  835. defer conns.Unlock()
  836. if idx, ok := conns.mapping[connectionID]; ok {
  837. conn := conns.connections[idx]
  838. err := conn.CloseFS()
  839. lastIdx := len(conns.connections) - 1
  840. conns.connections[idx] = conns.connections[lastIdx]
  841. conns.connections[lastIdx] = nil
  842. conns.connections = conns.connections[:lastIdx]
  843. delete(conns.mapping, connectionID)
  844. if idx != lastIdx {
  845. conns.mapping[conns.connections[idx].GetID()] = idx
  846. }
  847. conns.removeUserConnection(conn.GetUsername())
  848. metric.UpdateActiveConnectionsSize(lastIdx)
  849. logger.Debug(conn.GetProtocol(), conn.GetID(), "connection removed, local address %q, remote address %q close fs error: %v, num open connections: %d",
  850. conn.GetLocalAddress(), conn.GetRemoteAddress(), err, lastIdx)
  851. if conn.GetProtocol() == ProtocolFTP && conn.GetUsername() == "" && !util.Contains(ftpLoginCommands, conn.GetCommand()) {
  852. ip := util.GetIPFromRemoteAddress(conn.GetRemoteAddress())
  853. logger.ConnectionFailedLog("", ip, dataprovider.LoginMethodNoAuthTryed, conn.GetProtocol(),
  854. dataprovider.ErrNoAuthTryed.Error())
  855. metric.AddNoAuthTryed()
  856. AddDefenderEvent(ip, ProtocolFTP, HostEventNoLoginTried)
  857. dataprovider.ExecutePostLoginHook(&dataprovider.User{}, dataprovider.LoginMethodNoAuthTryed, ip,
  858. conn.GetProtocol(), dataprovider.ErrNoAuthTryed)
  859. }
  860. Config.checkPostDisconnectHook(conn.GetRemoteAddress(), conn.GetProtocol(), conn.GetUsername(),
  861. conn.GetID(), conn.GetConnectionTime())
  862. return
  863. }
  864. logger.Debug(logSender, "", "connection id %q to remove not found!", connectionID)
  865. }
  866. // Close closes an active connection.
  867. // It returns true on success
  868. func (conns *ActiveConnections) Close(connectionID, role string) bool {
  869. conns.RLock()
  870. var result bool
  871. if idx, ok := conns.mapping[connectionID]; ok {
  872. c := conns.connections[idx]
  873. if role == "" || c.GetRole() == role {
  874. defer func(conn ActiveConnection) {
  875. err := conn.Disconnect()
  876. logger.Debug(conn.GetProtocol(), conn.GetID(), "close connection requested, close err: %v", err)
  877. }(c)
  878. result = true
  879. }
  880. }
  881. conns.RUnlock()
  882. return result
  883. }
  884. // AddSSHConnection adds a new ssh connection to the active ones
  885. func (conns *ActiveConnections) AddSSHConnection(c *SSHConnection) {
  886. conns.Lock()
  887. defer conns.Unlock()
  888. conns.sshMapping[c.GetID()] = len(conns.sshConnections)
  889. conns.sshConnections = append(conns.sshConnections, c)
  890. logger.Debug(logSender, c.GetID(), "ssh connection added, num open connections: %d", len(conns.sshConnections))
  891. }
  892. // RemoveSSHConnection removes a connection from the active ones
  893. func (conns *ActiveConnections) RemoveSSHConnection(connectionID string) {
  894. conns.Lock()
  895. defer conns.Unlock()
  896. if idx, ok := conns.sshMapping[connectionID]; ok {
  897. lastIdx := len(conns.sshConnections) - 1
  898. conns.sshConnections[idx] = conns.sshConnections[lastIdx]
  899. conns.sshConnections[lastIdx] = nil
  900. conns.sshConnections = conns.sshConnections[:lastIdx]
  901. delete(conns.sshMapping, connectionID)
  902. if idx != lastIdx {
  903. conns.sshMapping[conns.sshConnections[idx].GetID()] = idx
  904. }
  905. logger.Debug(logSender, connectionID, "ssh connection removed, num open ssh connections: %d", lastIdx)
  906. return
  907. }
  908. logger.Warn(logSender, "", "ssh connection to remove with id %q not found!", connectionID)
  909. }
  910. func (conns *ActiveConnections) checkIdles() {
  911. conns.RLock()
  912. for _, sshConn := range conns.sshConnections {
  913. idleTime := time.Since(sshConn.GetLastActivity())
  914. if idleTime > Config.idleTimeoutAsDuration {
  915. // we close an SSH connection if it has no active connections associated
  916. idToMatch := fmt.Sprintf("_%s_", sshConn.GetID())
  917. toClose := true
  918. for _, conn := range conns.connections {
  919. if strings.Contains(conn.GetID(), idToMatch) {
  920. if time.Since(conn.GetLastActivity()) <= Config.idleTimeoutAsDuration {
  921. toClose = false
  922. break
  923. }
  924. }
  925. }
  926. if toClose {
  927. defer func(c *SSHConnection) {
  928. err := c.Close()
  929. logger.Debug(logSender, c.GetID(), "close idle SSH connection, idle time: %v, close err: %v",
  930. time.Since(c.GetLastActivity()), err)
  931. }(sshConn)
  932. }
  933. }
  934. }
  935. for _, c := range conns.connections {
  936. idleTime := time.Since(c.GetLastActivity())
  937. isUnauthenticatedFTPUser := (c.GetProtocol() == ProtocolFTP && c.GetUsername() == "")
  938. if idleTime > Config.idleTimeoutAsDuration || (isUnauthenticatedFTPUser && idleTime > Config.idleLoginTimeout) {
  939. defer func(conn ActiveConnection) {
  940. err := conn.Disconnect()
  941. logger.Debug(conn.GetProtocol(), conn.GetID(), "close idle connection, idle time: %v, username: %#v close err: %v",
  942. time.Since(conn.GetLastActivity()), conn.GetUsername(), err)
  943. }(c)
  944. }
  945. }
  946. conns.RUnlock()
  947. }
  948. func (conns *ActiveConnections) checkTransfers() {
  949. if conns.transfersCheckStatus.Load() {
  950. logger.Warn(logSender, "", "the previous transfer check is still running, skipping execution")
  951. return
  952. }
  953. conns.transfersCheckStatus.Store(true)
  954. defer conns.transfersCheckStatus.Store(false)
  955. conns.RLock()
  956. if len(conns.connections) < 2 {
  957. conns.RUnlock()
  958. return
  959. }
  960. var wg sync.WaitGroup
  961. logger.Debug(logSender, "", "start concurrent transfers check")
  962. // update the current size for transfers to monitors
  963. for _, c := range conns.connections {
  964. for _, t := range c.GetTransfers() {
  965. if t.HasSizeLimit {
  966. wg.Add(1)
  967. go func(transfer ConnectionTransfer, connID string) {
  968. defer wg.Done()
  969. transfersChecker.UpdateTransferCurrentSizes(transfer.ULSize, transfer.DLSize, transfer.ID, connID)
  970. }(t, c.GetID())
  971. }
  972. }
  973. }
  974. conns.RUnlock()
  975. logger.Debug(logSender, "", "waiting for the update of the transfers current size")
  976. wg.Wait()
  977. logger.Debug(logSender, "", "getting overquota transfers")
  978. overquotaTransfers := transfersChecker.GetOverquotaTransfers()
  979. logger.Debug(logSender, "", "number of overquota transfers: %v", len(overquotaTransfers))
  980. if len(overquotaTransfers) == 0 {
  981. return
  982. }
  983. conns.RLock()
  984. defer conns.RUnlock()
  985. for _, c := range conns.connections {
  986. for _, overquotaTransfer := range overquotaTransfers {
  987. if c.GetID() == overquotaTransfer.ConnID {
  988. logger.Info(logSender, c.GetID(), "user %#v is overquota, try to close transfer id %v",
  989. c.GetUsername(), overquotaTransfer.TransferID)
  990. var err error
  991. if overquotaTransfer.TransferType == TransferDownload {
  992. err = getReadQuotaExceededError(c.GetProtocol())
  993. } else {
  994. err = getQuotaExceededError(c.GetProtocol())
  995. }
  996. c.SignalTransferClose(overquotaTransfer.TransferID, err)
  997. }
  998. }
  999. }
  1000. logger.Debug(logSender, "", "transfers check completed")
  1001. }
  1002. // AddClientConnection stores a new client connection
  1003. func (conns *ActiveConnections) AddClientConnection(ipAddr string) {
  1004. conns.clients.add(ipAddr)
  1005. }
  1006. // RemoveClientConnection removes a disconnected client from the tracked ones
  1007. func (conns *ActiveConnections) RemoveClientConnection(ipAddr string) {
  1008. conns.clients.remove(ipAddr)
  1009. }
  1010. // GetClientConnections returns the total number of client connections
  1011. func (conns *ActiveConnections) GetClientConnections() int32 {
  1012. return conns.clients.getTotal()
  1013. }
  1014. // IsNewConnectionAllowed returns an error if the maximum number of concurrent allowed
  1015. // connections is exceeded or a whitelist is defined and the specified ipAddr is not listed
  1016. // or the service is shutting down
  1017. func (conns *ActiveConnections) IsNewConnectionAllowed(ipAddr, protocol string) error {
  1018. if isShuttingDown.Load() {
  1019. return ErrShuttingDown
  1020. }
  1021. if Config.allowList != nil {
  1022. isListed, _, err := Config.allowList.IsListed(ipAddr, protocol)
  1023. if err != nil {
  1024. logger.Error(logSender, "", "unable to query allow list, connection denied, ip %q, protocol %s, err: %v",
  1025. ipAddr, protocol, err)
  1026. return ErrConnectionDenied
  1027. }
  1028. if !isListed {
  1029. return ErrConnectionDenied
  1030. }
  1031. }
  1032. if Config.MaxTotalConnections == 0 && Config.MaxPerHostConnections == 0 {
  1033. return nil
  1034. }
  1035. if Config.MaxPerHostConnections > 0 {
  1036. if total := conns.clients.getTotalFrom(ipAddr); total > Config.MaxPerHostConnections {
  1037. logger.Info(logSender, "", "active connections from %s %d/%d", ipAddr, total, Config.MaxPerHostConnections)
  1038. AddDefenderEvent(ipAddr, protocol, HostEventLimitExceeded)
  1039. return ErrConnectionDenied
  1040. }
  1041. }
  1042. if Config.MaxTotalConnections > 0 {
  1043. if total := conns.clients.getTotal(); total > int32(Config.MaxTotalConnections) {
  1044. logger.Info(logSender, "", "active client connections %d/%d", total, Config.MaxTotalConnections)
  1045. return ErrConnectionDenied
  1046. }
  1047. // on a single SFTP connection we could have multiple SFTP channels or commands
  1048. // so we check the estabilished connections too
  1049. conns.RLock()
  1050. defer conns.RUnlock()
  1051. if sess := len(conns.connections); sess >= Config.MaxTotalConnections {
  1052. logger.Info(logSender, "", "active client sessions %d/%d", sess, Config.MaxTotalConnections)
  1053. return ErrConnectionDenied
  1054. }
  1055. }
  1056. return nil
  1057. }
  1058. // GetStats returns stats for active connections
  1059. func (conns *ActiveConnections) GetStats(role string) []ConnectionStatus {
  1060. conns.RLock()
  1061. defer conns.RUnlock()
  1062. stats := make([]ConnectionStatus, 0, len(conns.connections))
  1063. node := dataprovider.GetNodeName()
  1064. for _, c := range conns.connections {
  1065. if role == "" || c.GetRole() == role {
  1066. stat := ConnectionStatus{
  1067. Username: c.GetUsername(),
  1068. ConnectionID: c.GetID(),
  1069. ClientVersion: c.GetClientVersion(),
  1070. RemoteAddress: c.GetRemoteAddress(),
  1071. ConnectionTime: util.GetTimeAsMsSinceEpoch(c.GetConnectionTime()),
  1072. LastActivity: util.GetTimeAsMsSinceEpoch(c.GetLastActivity()),
  1073. Protocol: c.GetProtocol(),
  1074. Command: c.GetCommand(),
  1075. Transfers: c.GetTransfers(),
  1076. Node: node,
  1077. }
  1078. stats = append(stats, stat)
  1079. }
  1080. }
  1081. return stats
  1082. }
  1083. // ConnectionStatus returns the status for an active connection
  1084. type ConnectionStatus struct {
  1085. // Logged in username
  1086. Username string `json:"username"`
  1087. // Unique identifier for the connection
  1088. ConnectionID string `json:"connection_id"`
  1089. // client's version string
  1090. ClientVersion string `json:"client_version,omitempty"`
  1091. // Remote address for this connection
  1092. RemoteAddress string `json:"remote_address"`
  1093. // Connection time as unix timestamp in milliseconds
  1094. ConnectionTime int64 `json:"connection_time"`
  1095. // Last activity as unix timestamp in milliseconds
  1096. LastActivity int64 `json:"last_activity"`
  1097. // Protocol for this connection
  1098. Protocol string `json:"protocol"`
  1099. // active uploads/downloads
  1100. Transfers []ConnectionTransfer `json:"active_transfers,omitempty"`
  1101. // SSH command or WebDAV method
  1102. Command string `json:"command,omitempty"`
  1103. // Node identifier, omitted for single node installations
  1104. Node string `json:"node,omitempty"`
  1105. }
  1106. // GetConnectionDuration returns the connection duration as string
  1107. func (c *ConnectionStatus) GetConnectionDuration() string {
  1108. elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(c.ConnectionTime))
  1109. return util.GetDurationAsString(elapsed)
  1110. }
  1111. // GetConnectionInfo returns connection info.
  1112. // Protocol,Client Version and RemoteAddress are returned.
  1113. func (c *ConnectionStatus) GetConnectionInfo() string {
  1114. var result strings.Builder
  1115. result.WriteString(fmt.Sprintf("%v. Client: %#v From: %#v", c.Protocol, c.ClientVersion, c.RemoteAddress))
  1116. if c.Command == "" {
  1117. return result.String()
  1118. }
  1119. switch c.Protocol {
  1120. case ProtocolSSH, ProtocolFTP:
  1121. result.WriteString(fmt.Sprintf(". Command: %#v", c.Command))
  1122. case ProtocolWebDAV:
  1123. result.WriteString(fmt.Sprintf(". Method: %#v", c.Command))
  1124. }
  1125. return result.String()
  1126. }
  1127. // GetTransfersAsString returns the active transfers as string
  1128. func (c *ConnectionStatus) GetTransfersAsString() string {
  1129. result := ""
  1130. for _, t := range c.Transfers {
  1131. if result != "" {
  1132. result += ". "
  1133. }
  1134. result += t.getConnectionTransferAsString()
  1135. }
  1136. return result
  1137. }
  1138. // ActiveQuotaScan defines an active quota scan for a user
  1139. type ActiveQuotaScan struct {
  1140. // Username to which the quota scan refers
  1141. Username string `json:"username"`
  1142. // quota scan start time as unix timestamp in milliseconds
  1143. StartTime int64 `json:"start_time"`
  1144. Role string `json:"-"`
  1145. }
  1146. // ActiveVirtualFolderQuotaScan defines an active quota scan for a virtual folder
  1147. type ActiveVirtualFolderQuotaScan struct {
  1148. // folder name to which the quota scan refers
  1149. Name string `json:"name"`
  1150. // quota scan start time as unix timestamp in milliseconds
  1151. StartTime int64 `json:"start_time"`
  1152. }
  1153. // ActiveScans holds the active quota scans
  1154. type ActiveScans struct {
  1155. sync.RWMutex
  1156. UserScans []ActiveQuotaScan
  1157. FolderScans []ActiveVirtualFolderQuotaScan
  1158. }
  1159. // GetUsersQuotaScans returns the active users quota scans
  1160. func (s *ActiveScans) GetUsersQuotaScans(role string) []ActiveQuotaScan {
  1161. s.RLock()
  1162. defer s.RUnlock()
  1163. scans := make([]ActiveQuotaScan, 0, len(s.UserScans))
  1164. for _, scan := range s.UserScans {
  1165. if role == "" || role == scan.Role {
  1166. scans = append(scans, ActiveQuotaScan{
  1167. Username: scan.Username,
  1168. StartTime: scan.StartTime,
  1169. })
  1170. }
  1171. }
  1172. return scans
  1173. }
  1174. // AddUserQuotaScan adds a user to the ones with active quota scans.
  1175. // Returns false if the user has a quota scan already running
  1176. func (s *ActiveScans) AddUserQuotaScan(username, role string) bool {
  1177. s.Lock()
  1178. defer s.Unlock()
  1179. for _, scan := range s.UserScans {
  1180. if scan.Username == username {
  1181. return false
  1182. }
  1183. }
  1184. s.UserScans = append(s.UserScans, ActiveQuotaScan{
  1185. Username: username,
  1186. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1187. Role: role,
  1188. })
  1189. return true
  1190. }
  1191. // RemoveUserQuotaScan removes a user from the ones with active quota scans.
  1192. // Returns false if the user has no active quota scans
  1193. func (s *ActiveScans) RemoveUserQuotaScan(username string) bool {
  1194. s.Lock()
  1195. defer s.Unlock()
  1196. for idx, scan := range s.UserScans {
  1197. if scan.Username == username {
  1198. lastIdx := len(s.UserScans) - 1
  1199. s.UserScans[idx] = s.UserScans[lastIdx]
  1200. s.UserScans = s.UserScans[:lastIdx]
  1201. return true
  1202. }
  1203. }
  1204. return false
  1205. }
  1206. // GetVFoldersQuotaScans returns the active quota scans for virtual folders
  1207. func (s *ActiveScans) GetVFoldersQuotaScans() []ActiveVirtualFolderQuotaScan {
  1208. s.RLock()
  1209. defer s.RUnlock()
  1210. scans := make([]ActiveVirtualFolderQuotaScan, len(s.FolderScans))
  1211. copy(scans, s.FolderScans)
  1212. return scans
  1213. }
  1214. // AddVFolderQuotaScan adds a virtual folder to the ones with active quota scans.
  1215. // Returns false if the folder has a quota scan already running
  1216. func (s *ActiveScans) AddVFolderQuotaScan(folderName string) bool {
  1217. s.Lock()
  1218. defer s.Unlock()
  1219. for _, scan := range s.FolderScans {
  1220. if scan.Name == folderName {
  1221. return false
  1222. }
  1223. }
  1224. s.FolderScans = append(s.FolderScans, ActiveVirtualFolderQuotaScan{
  1225. Name: folderName,
  1226. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1227. })
  1228. return true
  1229. }
  1230. // RemoveVFolderQuotaScan removes a folder from the ones with active quota scans.
  1231. // Returns false if the folder has no active quota scans
  1232. func (s *ActiveScans) RemoveVFolderQuotaScan(folderName string) bool {
  1233. s.Lock()
  1234. defer s.Unlock()
  1235. for idx, scan := range s.FolderScans {
  1236. if scan.Name == folderName {
  1237. lastIdx := len(s.FolderScans) - 1
  1238. s.FolderScans[idx] = s.FolderScans[lastIdx]
  1239. s.FolderScans = s.FolderScans[:lastIdx]
  1240. return true
  1241. }
  1242. }
  1243. return false
  1244. }
  1245. // MetadataCheck defines an active metadata check
  1246. type MetadataCheck struct {
  1247. // Username to which the metadata check refers
  1248. Username string `json:"username"`
  1249. // check start time as unix timestamp in milliseconds
  1250. StartTime int64 `json:"start_time"`
  1251. Role string `json:"-"`
  1252. }
  1253. // MetadataChecks holds the active metadata checks
  1254. type MetadataChecks struct {
  1255. sync.RWMutex
  1256. checks []MetadataCheck
  1257. }
  1258. // Get returns the active metadata checks
  1259. func (c *MetadataChecks) Get(role string) []MetadataCheck {
  1260. c.RLock()
  1261. defer c.RUnlock()
  1262. checks := make([]MetadataCheck, 0, len(c.checks))
  1263. for _, check := range c.checks {
  1264. if role == "" || role == check.Role {
  1265. checks = append(checks, MetadataCheck{
  1266. Username: check.Username,
  1267. StartTime: check.StartTime,
  1268. })
  1269. }
  1270. }
  1271. return checks
  1272. }
  1273. // Add adds a user to the ones with active metadata checks.
  1274. // Return false if a metadata check is already active for the specified user
  1275. func (c *MetadataChecks) Add(username, role string) bool {
  1276. c.Lock()
  1277. defer c.Unlock()
  1278. for idx := range c.checks {
  1279. if c.checks[idx].Username == username {
  1280. return false
  1281. }
  1282. }
  1283. c.checks = append(c.checks, MetadataCheck{
  1284. Username: username,
  1285. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1286. Role: role,
  1287. })
  1288. return true
  1289. }
  1290. // Remove removes a user from the ones with active metadata checks
  1291. func (c *MetadataChecks) Remove(username string) bool {
  1292. c.Lock()
  1293. defer c.Unlock()
  1294. for idx := range c.checks {
  1295. if c.checks[idx].Username == username {
  1296. lastIdx := len(c.checks) - 1
  1297. c.checks[idx] = c.checks[lastIdx]
  1298. c.checks = c.checks[:lastIdx]
  1299. return true
  1300. }
  1301. }
  1302. return false
  1303. }