common.go 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package common defines code shared among file transfer packages and protocols
  15. package common
  16. import (
  17. "context"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "os/exec"
  25. "path/filepath"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/pires/go-proxyproto"
  32. "github.com/drakkan/sftpgo/v2/internal/command"
  33. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  34. "github.com/drakkan/sftpgo/v2/internal/httpclient"
  35. "github.com/drakkan/sftpgo/v2/internal/logger"
  36. "github.com/drakkan/sftpgo/v2/internal/metric"
  37. "github.com/drakkan/sftpgo/v2/internal/plugin"
  38. "github.com/drakkan/sftpgo/v2/internal/util"
  39. "github.com/drakkan/sftpgo/v2/internal/vfs"
  40. )
  41. // constants
  42. const (
  43. logSender = "common"
  44. uploadLogSender = "Upload"
  45. downloadLogSender = "Download"
  46. renameLogSender = "Rename"
  47. rmdirLogSender = "Rmdir"
  48. mkdirLogSender = "Mkdir"
  49. symlinkLogSender = "Symlink"
  50. removeLogSender = "Remove"
  51. chownLogSender = "Chown"
  52. chmodLogSender = "Chmod"
  53. chtimesLogSender = "Chtimes"
  54. truncateLogSender = "Truncate"
  55. operationDownload = "download"
  56. operationUpload = "upload"
  57. operationFirstDownload = "first-download"
  58. operationFirstUpload = "first-upload"
  59. operationDelete = "delete"
  60. // Pre-download action name
  61. OperationPreDownload = "pre-download"
  62. // Pre-upload action name
  63. OperationPreUpload = "pre-upload"
  64. operationPreDelete = "pre-delete"
  65. operationRename = "rename"
  66. operationMkdir = "mkdir"
  67. operationRmdir = "rmdir"
  68. // SSH command action name
  69. OperationSSHCmd = "ssh_cmd"
  70. chtimesFormat = "2006-01-02T15:04:05" // YYYY-MM-DDTHH:MM:SS
  71. idleTimeoutCheckInterval = 3 * time.Minute
  72. periodicTimeoutCheckInterval = 1 * time.Minute
  73. )
  74. // Stat flags
  75. const (
  76. StatAttrUIDGID = 1
  77. StatAttrPerms = 2
  78. StatAttrTimes = 4
  79. StatAttrSize = 8
  80. )
  81. // Transfer types
  82. const (
  83. TransferUpload = iota
  84. TransferDownload
  85. )
  86. // Supported protocols
  87. const (
  88. ProtocolSFTP = "SFTP"
  89. ProtocolSCP = "SCP"
  90. ProtocolSSH = "SSH"
  91. ProtocolFTP = "FTP"
  92. ProtocolWebDAV = "DAV"
  93. ProtocolHTTP = "HTTP"
  94. ProtocolHTTPShare = "HTTPShare"
  95. ProtocolDataRetention = "DataRetention"
  96. ProtocolOIDC = "OIDC"
  97. protocolEventAction = "EventAction"
  98. )
  99. // Upload modes
  100. const (
  101. UploadModeStandard = iota
  102. UploadModeAtomic
  103. UploadModeAtomicWithResume
  104. )
  105. func init() {
  106. Connections.clients = clientsMap{
  107. clients: make(map[string]int),
  108. }
  109. Connections.perUserConns = make(map[string]int)
  110. Connections.mapping = make(map[string]int)
  111. Connections.sshMapping = make(map[string]int)
  112. }
  113. // errors definitions
  114. var (
  115. ErrPermissionDenied = errors.New("permission denied")
  116. ErrNotExist = errors.New("no such file or directory")
  117. ErrOpUnsupported = errors.New("operation unsupported")
  118. ErrGenericFailure = errors.New("failure")
  119. ErrQuotaExceeded = errors.New("denying write due to space limit")
  120. ErrReadQuotaExceeded = errors.New("denying read due to quota limit")
  121. ErrConnectionDenied = errors.New("you are not allowed to connect")
  122. ErrNoBinding = errors.New("no binding configured")
  123. ErrCrtRevoked = errors.New("your certificate has been revoked")
  124. ErrNoCredentials = errors.New("no credential provided")
  125. ErrInternalFailure = errors.New("internal failure")
  126. ErrTransferAborted = errors.New("transfer aborted")
  127. ErrShuttingDown = errors.New("the service is shutting down")
  128. errNoTransfer = errors.New("requested transfer not found")
  129. errTransferMismatch = errors.New("transfer mismatch")
  130. )
  131. var (
  132. // Config is the configuration for the supported protocols
  133. Config Configuration
  134. // Connections is the list of active connections
  135. Connections ActiveConnections
  136. // QuotaScans is the list of active quota scans
  137. QuotaScans ActiveScans
  138. // ActiveMetadataChecks holds the active metadata checks
  139. ActiveMetadataChecks MetadataChecks
  140. transfersChecker TransfersChecker
  141. supportedProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP, ProtocolWebDAV,
  142. ProtocolHTTP, ProtocolHTTPShare, ProtocolOIDC}
  143. disconnHookProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP}
  144. // the map key is the protocol, for each protocol we can have multiple rate limiters
  145. rateLimiters map[string][]*rateLimiter
  146. isShuttingDown atomic.Bool
  147. )
  148. // Initialize sets the common configuration
  149. func Initialize(c Configuration, isShared int) error {
  150. isShuttingDown.Store(false)
  151. Config = c
  152. Config.Actions.ExecuteOn = util.RemoveDuplicates(Config.Actions.ExecuteOn, true)
  153. Config.Actions.ExecuteSync = util.RemoveDuplicates(Config.Actions.ExecuteSync, true)
  154. Config.ProxyAllowed = util.RemoveDuplicates(Config.ProxyAllowed, true)
  155. Config.idleLoginTimeout = 2 * time.Minute
  156. Config.idleTimeoutAsDuration = time.Duration(Config.IdleTimeout) * time.Minute
  157. startPeriodicChecks(periodicTimeoutCheckInterval)
  158. Config.defender = nil
  159. Config.whitelist = nil
  160. rateLimiters = make(map[string][]*rateLimiter)
  161. for _, rlCfg := range c.RateLimitersConfig {
  162. if rlCfg.isEnabled() {
  163. if err := rlCfg.validate(); err != nil {
  164. return fmt.Errorf("rate limiters initialization error: %w", err)
  165. }
  166. allowList, err := util.ParseAllowedIPAndRanges(rlCfg.AllowList)
  167. if err != nil {
  168. return fmt.Errorf("unable to parse rate limiter allow list %v: %v", rlCfg.AllowList, err)
  169. }
  170. rateLimiter := rlCfg.getLimiter()
  171. rateLimiter.allowList = allowList
  172. for _, protocol := range rlCfg.Protocols {
  173. rateLimiters[protocol] = append(rateLimiters[protocol], rateLimiter)
  174. }
  175. }
  176. }
  177. if c.DefenderConfig.Enabled {
  178. if !util.Contains(supportedDefenderDrivers, c.DefenderConfig.Driver) {
  179. return fmt.Errorf("unsupported defender driver %#v", c.DefenderConfig.Driver)
  180. }
  181. var defender Defender
  182. var err error
  183. switch c.DefenderConfig.Driver {
  184. case DefenderDriverProvider:
  185. defender, err = newDBDefender(&c.DefenderConfig)
  186. default:
  187. defender, err = newInMemoryDefender(&c.DefenderConfig)
  188. }
  189. if err != nil {
  190. return fmt.Errorf("defender initialization error: %v", err)
  191. }
  192. logger.Info(logSender, "", "defender initialized with config %+v", c.DefenderConfig)
  193. Config.defender = defender
  194. }
  195. if c.WhiteListFile != "" {
  196. whitelist := &whitelist{
  197. fileName: c.WhiteListFile,
  198. }
  199. if err := whitelist.reload(); err != nil {
  200. return fmt.Errorf("whitelist initialization error: %w", err)
  201. }
  202. logger.Info(logSender, "", "whitelist initialized from file: %#v", c.WhiteListFile)
  203. Config.whitelist = whitelist
  204. }
  205. vfs.SetTempPath(c.TempPath)
  206. dataprovider.SetTempPath(c.TempPath)
  207. vfs.SetAllowSelfConnections(c.AllowSelfConnections)
  208. dataprovider.SetAllowSelfConnections(c.AllowSelfConnections)
  209. transfersChecker = getTransfersChecker(isShared)
  210. return nil
  211. }
  212. // CheckClosing returns an error if the service is closing
  213. func CheckClosing() error {
  214. if isShuttingDown.Load() {
  215. return ErrShuttingDown
  216. }
  217. return nil
  218. }
  219. // WaitForTransfers waits, for the specified grace time, for currently ongoing
  220. // client-initiated transfer sessions to completes.
  221. // A zero graceTime means no wait
  222. func WaitForTransfers(graceTime int) {
  223. if graceTime == 0 {
  224. return
  225. }
  226. if isShuttingDown.Swap(true) {
  227. return
  228. }
  229. if activeHooks.Load() == 0 && getActiveConnections() == 0 {
  230. return
  231. }
  232. graceTimer := time.NewTimer(time.Duration(graceTime) * time.Second)
  233. ticker := time.NewTicker(3 * time.Second)
  234. for {
  235. select {
  236. case <-ticker.C:
  237. hooks := activeHooks.Load()
  238. logger.Info(logSender, "", "active hooks: %d", hooks)
  239. if hooks == 0 && getActiveConnections() == 0 {
  240. logger.Info(logSender, "", "no more active connections, graceful shutdown")
  241. ticker.Stop()
  242. graceTimer.Stop()
  243. return
  244. }
  245. case <-graceTimer.C:
  246. logger.Info(logSender, "", "grace time expired, hard shutdown")
  247. ticker.Stop()
  248. return
  249. }
  250. }
  251. }
  252. // getActiveConnections returns the number of connections with active transfers
  253. func getActiveConnections() int {
  254. var activeConns int
  255. Connections.RLock()
  256. for _, c := range Connections.connections {
  257. if len(c.GetTransfers()) > 0 {
  258. activeConns++
  259. }
  260. }
  261. Connections.RUnlock()
  262. logger.Info(logSender, "", "number of connections with active transfers: %d", activeConns)
  263. return activeConns
  264. }
  265. // LimitRate blocks until all the configured rate limiters
  266. // allow one event to happen.
  267. // It returns an error if the time to wait exceeds the max
  268. // allowed delay
  269. func LimitRate(protocol, ip string) (time.Duration, error) {
  270. for _, limiter := range rateLimiters[protocol] {
  271. if delay, err := limiter.Wait(ip); err != nil {
  272. logger.Debug(logSender, "", "protocol %v ip %v: %v", protocol, ip, err)
  273. return delay, err
  274. }
  275. }
  276. return 0, nil
  277. }
  278. // Reload reloads the whitelist, the IP filter plugin and the defender's block and safe lists
  279. func Reload() error {
  280. plugin.Handler.ReloadFilter()
  281. var errWithelist error
  282. if Config.whitelist != nil {
  283. errWithelist = Config.whitelist.reload()
  284. }
  285. if Config.defender == nil {
  286. return errWithelist
  287. }
  288. if err := Config.defender.Reload(); err != nil {
  289. return err
  290. }
  291. return errWithelist
  292. }
  293. // IsBanned returns true if the specified IP address is banned
  294. func IsBanned(ip string) bool {
  295. if plugin.Handler.IsIPBanned(ip) {
  296. return true
  297. }
  298. if Config.defender == nil {
  299. return false
  300. }
  301. return Config.defender.IsBanned(ip)
  302. }
  303. // GetDefenderBanTime returns the ban time for the given IP
  304. // or nil if the IP is not banned or the defender is disabled
  305. func GetDefenderBanTime(ip string) (*time.Time, error) {
  306. if Config.defender == nil {
  307. return nil, nil
  308. }
  309. return Config.defender.GetBanTime(ip)
  310. }
  311. // GetDefenderHosts returns hosts that are banned or for which some violations have been detected
  312. func GetDefenderHosts() ([]dataprovider.DefenderEntry, error) {
  313. if Config.defender == nil {
  314. return nil, nil
  315. }
  316. return Config.defender.GetHosts()
  317. }
  318. // GetDefenderHost returns a defender host by ip, if any
  319. func GetDefenderHost(ip string) (dataprovider.DefenderEntry, error) {
  320. if Config.defender == nil {
  321. return dataprovider.DefenderEntry{}, errors.New("defender is disabled")
  322. }
  323. return Config.defender.GetHost(ip)
  324. }
  325. // DeleteDefenderHost removes the specified IP address from the defender lists
  326. func DeleteDefenderHost(ip string) bool {
  327. if Config.defender == nil {
  328. return false
  329. }
  330. return Config.defender.DeleteHost(ip)
  331. }
  332. // GetDefenderScore returns the score for the given IP
  333. func GetDefenderScore(ip string) (int, error) {
  334. if Config.defender == nil {
  335. return 0, nil
  336. }
  337. return Config.defender.GetScore(ip)
  338. }
  339. // AddDefenderEvent adds the specified defender event for the given IP
  340. func AddDefenderEvent(ip string, event HostEvent) {
  341. if Config.defender == nil {
  342. return
  343. }
  344. Config.defender.AddEvent(ip, event)
  345. }
  346. func startPeriodicChecks(duration time.Duration) {
  347. startEventScheduler()
  348. spec := fmt.Sprintf("@every %s", duration)
  349. _, err := eventScheduler.AddFunc(spec, Connections.checkTransfers)
  350. util.PanicOnError(err)
  351. logger.Info(logSender, "", "scheduled overquota transfers check, schedule %q", spec)
  352. if Config.IdleTimeout > 0 {
  353. ratio := idleTimeoutCheckInterval / periodicTimeoutCheckInterval
  354. spec = fmt.Sprintf("@every %s", duration*ratio)
  355. _, err = eventScheduler.AddFunc(spec, Connections.checkIdles)
  356. util.PanicOnError(err)
  357. logger.Info(logSender, "", "scheduled idle connections check, schedule %q", spec)
  358. }
  359. }
  360. // ActiveTransfer defines the interface for the current active transfers
  361. type ActiveTransfer interface {
  362. GetID() int64
  363. GetType() int
  364. GetSize() int64
  365. GetDownloadedSize() int64
  366. GetUploadedSize() int64
  367. GetVirtualPath() string
  368. GetStartTime() time.Time
  369. SignalClose(err error)
  370. Truncate(fsPath string, size int64) (int64, error)
  371. GetRealFsPath(fsPath string) string
  372. SetTimes(fsPath string, atime time.Time, mtime time.Time) bool
  373. GetTruncatedSize() int64
  374. HasSizeLimit() bool
  375. }
  376. // ActiveConnection defines the interface for the current active connections
  377. type ActiveConnection interface {
  378. GetID() string
  379. GetUsername() string
  380. GetRole() string
  381. GetMaxSessions() int
  382. GetLocalAddress() string
  383. GetRemoteAddress() string
  384. GetClientVersion() string
  385. GetProtocol() string
  386. GetConnectionTime() time.Time
  387. GetLastActivity() time.Time
  388. GetCommand() string
  389. Disconnect() error
  390. AddTransfer(t ActiveTransfer)
  391. RemoveTransfer(t ActiveTransfer)
  392. GetTransfers() []ConnectionTransfer
  393. SignalTransferClose(transferID int64, err error)
  394. CloseFS() error
  395. }
  396. // StatAttributes defines the attributes for set stat commands
  397. type StatAttributes struct {
  398. Mode os.FileMode
  399. Atime time.Time
  400. Mtime time.Time
  401. UID int
  402. GID int
  403. Flags int
  404. Size int64
  405. }
  406. // ConnectionTransfer defines the trasfer details to expose
  407. type ConnectionTransfer struct {
  408. ID int64 `json:"-"`
  409. OperationType string `json:"operation_type"`
  410. StartTime int64 `json:"start_time"`
  411. Size int64 `json:"size"`
  412. VirtualPath string `json:"path"`
  413. HasSizeLimit bool `json:"-"`
  414. ULSize int64 `json:"-"`
  415. DLSize int64 `json:"-"`
  416. }
  417. func (t *ConnectionTransfer) getConnectionTransferAsString() string {
  418. result := ""
  419. switch t.OperationType {
  420. case operationUpload:
  421. result += "UL "
  422. case operationDownload:
  423. result += "DL "
  424. }
  425. result += fmt.Sprintf("%q ", t.VirtualPath)
  426. if t.Size > 0 {
  427. elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(t.StartTime))
  428. speed := float64(t.Size) / float64(util.GetTimeAsMsSinceEpoch(time.Now())-t.StartTime)
  429. result += fmt.Sprintf("Size: %s Elapsed: %s Speed: \"%.1f KB/s\"", util.ByteCountIEC(t.Size),
  430. util.GetDurationAsString(elapsed), speed)
  431. }
  432. return result
  433. }
  434. type whitelist struct {
  435. fileName string
  436. sync.RWMutex
  437. list HostList
  438. }
  439. func (l *whitelist) reload() error {
  440. list, err := loadHostListFromFile(l.fileName)
  441. if err != nil {
  442. return err
  443. }
  444. if list == nil {
  445. return errors.New("cannot accept a nil whitelist")
  446. }
  447. l.Lock()
  448. defer l.Unlock()
  449. l.list = *list
  450. return nil
  451. }
  452. func (l *whitelist) isAllowed(ip string) bool {
  453. l.RLock()
  454. defer l.RUnlock()
  455. return l.list.isListed(ip)
  456. }
  457. // Configuration defines configuration parameters common to all supported protocols
  458. type Configuration struct {
  459. // Maximum idle timeout as minutes. If a client is idle for a time that exceeds this setting it will be disconnected.
  460. // 0 means disabled
  461. IdleTimeout int `json:"idle_timeout" mapstructure:"idle_timeout"`
  462. // UploadMode 0 means standard, the files are uploaded directly to the requested path.
  463. // 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path
  464. // when the client ends the upload. Atomic mode avoid problems such as a web server that
  465. // serves partial files when the files are being uploaded.
  466. // In atomic mode if there is an upload error the temporary file is deleted and so the requested
  467. // upload path will not contain a partial file.
  468. // 2 means atomic with resume support: as atomic but if there is an upload error the temporary
  469. // file is renamed to the requested path and not deleted, this way a client can reconnect and resume
  470. // the upload.
  471. UploadMode int `json:"upload_mode" mapstructure:"upload_mode"`
  472. // Actions to execute for SFTP file operations and SSH commands
  473. Actions ProtocolActions `json:"actions" mapstructure:"actions"`
  474. // SetstatMode 0 means "normal mode": requests for changing permissions and owner/group are executed.
  475. // 1 means "ignore mode": requests for changing permissions and owner/group are silently ignored.
  476. // 2 means "ignore mode for cloud fs": requests for changing permissions and owner/group are
  477. // silently ignored for cloud based filesystem such as S3, GCS, Azure Blob. Requests for changing
  478. // modification times are ignored for cloud based filesystem if they are not supported.
  479. SetstatMode int `json:"setstat_mode" mapstructure:"setstat_mode"`
  480. // TempPath defines the path for temporary files such as those used for atomic uploads or file pipes.
  481. // If you set this option you must make sure that the defined path exists, is accessible for writing
  482. // by the user running SFTPGo, and is on the same filesystem as the users home directories otherwise
  483. // the renaming for atomic uploads will become a copy and therefore may take a long time.
  484. // The temporary files are not namespaced. The default is generally fine. Leave empty for the default.
  485. TempPath string `json:"temp_path" mapstructure:"temp_path"`
  486. // Support for HAProxy PROXY protocol.
  487. // If you are running SFTPGo behind a proxy server such as HAProxy, AWS ELB or NGNIX, you can enable
  488. // the proxy protocol. It provides a convenient way to safely transport connection information
  489. // such as a client's address across multiple layers of NAT or TCP proxies to get the real
  490. // client IP address instead of the proxy IP. Both protocol versions 1 and 2 are supported.
  491. // - 0 means disabled
  492. // - 1 means proxy protocol enabled. Proxy header will be used and requests without proxy header will be accepted.
  493. // - 2 means proxy protocol required. Proxy header will be used and requests without proxy header will be rejected.
  494. // If the proxy protocol is enabled in SFTPGo then you have to enable the protocol in your proxy configuration too,
  495. // for example for HAProxy add "send-proxy" or "send-proxy-v2" to each server configuration line.
  496. ProxyProtocol int `json:"proxy_protocol" mapstructure:"proxy_protocol"`
  497. // List of IP addresses and IP ranges allowed to send the proxy header.
  498. // If proxy protocol is set to 1 and we receive a proxy header from an IP that is not in the list then the
  499. // connection will be accepted and the header will be ignored.
  500. // If proxy protocol is set to 2 and we receive a proxy header from an IP that is not in the list then the
  501. // connection will be rejected.
  502. ProxyAllowed []string `json:"proxy_allowed" mapstructure:"proxy_allowed"`
  503. // Absolute path to an external program or an HTTP URL to invoke as soon as SFTPGo starts.
  504. // If you define an HTTP URL it will be invoked using a `GET` request.
  505. // Please note that SFTPGo services may not yet be available when this hook is run.
  506. // Leave empty do disable.
  507. StartupHook string `json:"startup_hook" mapstructure:"startup_hook"`
  508. // Absolute path to an external program or an HTTP URL to invoke after a user connects
  509. // and before he tries to login. It allows you to reject the connection based on the source
  510. // ip address. Leave empty do disable.
  511. PostConnectHook string `json:"post_connect_hook" mapstructure:"post_connect_hook"`
  512. // Absolute path to an external program or an HTTP URL to invoke after an SSH/FTP connection ends.
  513. // Leave empty do disable.
  514. PostDisconnectHook string `json:"post_disconnect_hook" mapstructure:"post_disconnect_hook"`
  515. // Absolute path to an external program or an HTTP URL to invoke after a data retention check completes.
  516. // Leave empty do disable.
  517. DataRetentionHook string `json:"data_retention_hook" mapstructure:"data_retention_hook"`
  518. // Maximum number of concurrent client connections. 0 means unlimited
  519. MaxTotalConnections int `json:"max_total_connections" mapstructure:"max_total_connections"`
  520. // Maximum number of concurrent client connections from the same host (IP). 0 means unlimited
  521. MaxPerHostConnections int `json:"max_per_host_connections" mapstructure:"max_per_host_connections"`
  522. // Path to a file containing a list of IP addresses and/or networks to allow.
  523. // Only the listed IPs/networks can access the configured services, all other client connections
  524. // will be dropped before they even try to authenticate.
  525. WhiteListFile string `json:"whitelist_file" mapstructure:"whitelist_file"`
  526. // Allow users on this instance to use other users/virtual folders on this instance as storage backend.
  527. // Enable this setting if you know what you are doing.
  528. AllowSelfConnections int `json:"allow_self_connections" mapstructure:"allow_self_connections"`
  529. // Defender configuration
  530. DefenderConfig DefenderConfig `json:"defender" mapstructure:"defender"`
  531. // Rate limiter configurations
  532. RateLimitersConfig []RateLimiterConfig `json:"rate_limiters" mapstructure:"rate_limiters"`
  533. idleTimeoutAsDuration time.Duration
  534. idleLoginTimeout time.Duration
  535. defender Defender
  536. whitelist *whitelist
  537. }
  538. // IsAtomicUploadEnabled returns true if atomic upload is enabled
  539. func (c *Configuration) IsAtomicUploadEnabled() bool {
  540. return c.UploadMode == UploadModeAtomic || c.UploadMode == UploadModeAtomicWithResume
  541. }
  542. // GetProxyListener returns a wrapper for the given listener that supports the
  543. // HAProxy Proxy Protocol
  544. func (c *Configuration) GetProxyListener(listener net.Listener) (*proxyproto.Listener, error) {
  545. var err error
  546. if c.ProxyProtocol > 0 {
  547. var policyFunc func(upstream net.Addr) (proxyproto.Policy, error)
  548. if c.ProxyProtocol == 1 && len(c.ProxyAllowed) > 0 {
  549. policyFunc, err = proxyproto.LaxWhiteListPolicy(c.ProxyAllowed)
  550. if err != nil {
  551. return nil, err
  552. }
  553. }
  554. if c.ProxyProtocol == 2 {
  555. if len(c.ProxyAllowed) == 0 {
  556. policyFunc = func(upstream net.Addr) (proxyproto.Policy, error) {
  557. return proxyproto.REQUIRE, nil
  558. }
  559. } else {
  560. policyFunc, err = proxyproto.StrictWhiteListPolicy(c.ProxyAllowed)
  561. if err != nil {
  562. return nil, err
  563. }
  564. }
  565. }
  566. return &proxyproto.Listener{
  567. Listener: listener,
  568. Policy: policyFunc,
  569. ReadHeaderTimeout: 10 * time.Second,
  570. }, nil
  571. }
  572. return nil, errors.New("proxy protocol not configured")
  573. }
  574. // ExecuteStartupHook runs the startup hook if defined
  575. func (c *Configuration) ExecuteStartupHook() error {
  576. if c.StartupHook == "" {
  577. return nil
  578. }
  579. if strings.HasPrefix(c.StartupHook, "http") {
  580. var url *url.URL
  581. url, err := url.Parse(c.StartupHook)
  582. if err != nil {
  583. logger.Warn(logSender, "", "Invalid startup hook %#v: %v", c.StartupHook, err)
  584. return err
  585. }
  586. startTime := time.Now()
  587. resp, err := httpclient.RetryableGet(url.String())
  588. if err != nil {
  589. logger.Warn(logSender, "", "Error executing startup hook: %v", err)
  590. return err
  591. }
  592. defer resp.Body.Close()
  593. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, response code: %v", time.Since(startTime), resp.StatusCode)
  594. return nil
  595. }
  596. if !filepath.IsAbs(c.StartupHook) {
  597. err := fmt.Errorf("invalid startup hook %#v", c.StartupHook)
  598. logger.Warn(logSender, "", "Invalid startup hook %#v", c.StartupHook)
  599. return err
  600. }
  601. startTime := time.Now()
  602. timeout, env, args := command.GetConfig(c.StartupHook, command.HookStartup)
  603. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  604. defer cancel()
  605. cmd := exec.CommandContext(ctx, c.StartupHook, args...)
  606. cmd.Env = env
  607. err := cmd.Run()
  608. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, error: %v", time.Since(startTime), err)
  609. return nil
  610. }
  611. func (c *Configuration) executePostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
  612. startNewHook()
  613. defer hookEnded()
  614. ipAddr := util.GetIPFromRemoteAddress(remoteAddr)
  615. connDuration := int64(time.Since(connectionTime) / time.Millisecond)
  616. if strings.HasPrefix(c.PostDisconnectHook, "http") {
  617. var url *url.URL
  618. url, err := url.Parse(c.PostDisconnectHook)
  619. if err != nil {
  620. logger.Warn(protocol, connID, "Invalid post disconnect hook %#v: %v", c.PostDisconnectHook, err)
  621. return
  622. }
  623. q := url.Query()
  624. q.Add("ip", ipAddr)
  625. q.Add("protocol", protocol)
  626. q.Add("username", username)
  627. q.Add("connection_duration", strconv.FormatInt(connDuration, 10))
  628. url.RawQuery = q.Encode()
  629. startTime := time.Now()
  630. resp, err := httpclient.RetryableGet(url.String())
  631. respCode := 0
  632. if err == nil {
  633. respCode = resp.StatusCode
  634. resp.Body.Close()
  635. }
  636. logger.Debug(protocol, connID, "Post disconnect hook response code: %v, elapsed: %v, err: %v",
  637. respCode, time.Since(startTime), err)
  638. return
  639. }
  640. if !filepath.IsAbs(c.PostDisconnectHook) {
  641. logger.Debug(protocol, connID, "invalid post disconnect hook %#v", c.PostDisconnectHook)
  642. return
  643. }
  644. timeout, env, args := command.GetConfig(c.PostDisconnectHook, command.HookPostDisconnect)
  645. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  646. defer cancel()
  647. startTime := time.Now()
  648. cmd := exec.CommandContext(ctx, c.PostDisconnectHook, args...)
  649. cmd.Env = append(env,
  650. fmt.Sprintf("SFTPGO_CONNECTION_IP=%v", ipAddr),
  651. fmt.Sprintf("SFTPGO_CONNECTION_USERNAME=%v", username),
  652. fmt.Sprintf("SFTPGO_CONNECTION_DURATION=%v", connDuration),
  653. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%v", protocol))
  654. err := cmd.Run()
  655. logger.Debug(protocol, connID, "Post disconnect hook executed, elapsed: %v error: %v", time.Since(startTime), err)
  656. }
  657. func (c *Configuration) checkPostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
  658. if c.PostDisconnectHook == "" {
  659. return
  660. }
  661. if !util.Contains(disconnHookProtocols, protocol) {
  662. return
  663. }
  664. go c.executePostDisconnectHook(remoteAddr, protocol, username, connID, connectionTime)
  665. }
  666. // ExecutePostConnectHook executes the post connect hook if defined
  667. func (c *Configuration) ExecutePostConnectHook(ipAddr, protocol string) error {
  668. if c.PostConnectHook == "" {
  669. return nil
  670. }
  671. if strings.HasPrefix(c.PostConnectHook, "http") {
  672. var url *url.URL
  673. url, err := url.Parse(c.PostConnectHook)
  674. if err != nil {
  675. logger.Warn(protocol, "", "Login from ip %#v denied, invalid post connect hook %#v: %v",
  676. ipAddr, c.PostConnectHook, err)
  677. return err
  678. }
  679. q := url.Query()
  680. q.Add("ip", ipAddr)
  681. q.Add("protocol", protocol)
  682. url.RawQuery = q.Encode()
  683. resp, err := httpclient.RetryableGet(url.String())
  684. if err != nil {
  685. logger.Warn(protocol, "", "Login from ip %#v denied, error executing post connect hook: %v", ipAddr, err)
  686. return err
  687. }
  688. defer resp.Body.Close()
  689. if resp.StatusCode != http.StatusOK {
  690. logger.Warn(protocol, "", "Login from ip %#v denied, post connect hook response code: %v", ipAddr, resp.StatusCode)
  691. return errUnexpectedHTTResponse
  692. }
  693. return nil
  694. }
  695. if !filepath.IsAbs(c.PostConnectHook) {
  696. err := fmt.Errorf("invalid post connect hook %#v", c.PostConnectHook)
  697. logger.Warn(protocol, "", "Login from ip %#v denied: %v", ipAddr, err)
  698. return err
  699. }
  700. timeout, env, args := command.GetConfig(c.PostConnectHook, command.HookPostConnect)
  701. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  702. defer cancel()
  703. cmd := exec.CommandContext(ctx, c.PostConnectHook, args...)
  704. cmd.Env = append(env,
  705. fmt.Sprintf("SFTPGO_CONNECTION_IP=%v", ipAddr),
  706. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%v", protocol))
  707. err := cmd.Run()
  708. if err != nil {
  709. logger.Warn(protocol, "", "Login from ip %#v denied, connect hook error: %v", ipAddr, err)
  710. }
  711. return err
  712. }
  713. // SSHConnection defines an ssh connection.
  714. // Each SSH connection can open several channels for SFTP or SSH commands
  715. type SSHConnection struct {
  716. id string
  717. conn net.Conn
  718. lastActivity atomic.Int64
  719. }
  720. // NewSSHConnection returns a new SSHConnection
  721. func NewSSHConnection(id string, conn net.Conn) *SSHConnection {
  722. c := &SSHConnection{
  723. id: id,
  724. conn: conn,
  725. }
  726. c.lastActivity.Store(time.Now().UnixNano())
  727. return c
  728. }
  729. // GetID returns the ID for this SSHConnection
  730. func (c *SSHConnection) GetID() string {
  731. return c.id
  732. }
  733. // UpdateLastActivity updates last activity for this connection
  734. func (c *SSHConnection) UpdateLastActivity() {
  735. c.lastActivity.Store(time.Now().UnixNano())
  736. }
  737. // GetLastActivity returns the last connection activity
  738. func (c *SSHConnection) GetLastActivity() time.Time {
  739. return time.Unix(0, c.lastActivity.Load())
  740. }
  741. // Close closes the underlying network connection
  742. func (c *SSHConnection) Close() error {
  743. return c.conn.Close()
  744. }
  745. // ActiveConnections holds the currect active connections with the associated transfers
  746. type ActiveConnections struct {
  747. // clients contains both authenticated and estabilished connections and the ones waiting
  748. // for authentication
  749. clients clientsMap
  750. transfersCheckStatus atomic.Bool
  751. sync.RWMutex
  752. connections []ActiveConnection
  753. mapping map[string]int
  754. sshConnections []*SSHConnection
  755. sshMapping map[string]int
  756. perUserConns map[string]int
  757. }
  758. // internal method, must be called within a locked block
  759. func (conns *ActiveConnections) addUserConnection(username string) {
  760. if username == "" {
  761. return
  762. }
  763. conns.perUserConns[username]++
  764. }
  765. // internal method, must be called within a locked block
  766. func (conns *ActiveConnections) removeUserConnection(username string) {
  767. if username == "" {
  768. return
  769. }
  770. if val, ok := conns.perUserConns[username]; ok {
  771. conns.perUserConns[username]--
  772. if val > 1 {
  773. return
  774. }
  775. delete(conns.perUserConns, username)
  776. }
  777. }
  778. // GetActiveSessions returns the number of active sessions for the given username.
  779. // We return the open sessions for any protocol
  780. func (conns *ActiveConnections) GetActiveSessions(username string) int {
  781. conns.RLock()
  782. defer conns.RUnlock()
  783. return conns.perUserConns[username]
  784. }
  785. // Add adds a new connection to the active ones
  786. func (conns *ActiveConnections) Add(c ActiveConnection) error {
  787. conns.Lock()
  788. defer conns.Unlock()
  789. if username := c.GetUsername(); username != "" {
  790. if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
  791. if val := conns.perUserConns[username]; val >= maxSessions {
  792. return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
  793. }
  794. }
  795. conns.addUserConnection(username)
  796. }
  797. conns.mapping[c.GetID()] = len(conns.connections)
  798. conns.connections = append(conns.connections, c)
  799. metric.UpdateActiveConnectionsSize(len(conns.connections))
  800. logger.Debug(c.GetProtocol(), c.GetID(), "connection added, local address %q, remote address %q, num open connections: %d",
  801. c.GetLocalAddress(), c.GetRemoteAddress(), len(conns.connections))
  802. return nil
  803. }
  804. // Swap replaces an existing connection with the given one.
  805. // This method is useful if you have to change some connection details
  806. // for example for FTP is used to update the connection once the user
  807. // authenticates
  808. func (conns *ActiveConnections) Swap(c ActiveConnection) error {
  809. conns.Lock()
  810. defer conns.Unlock()
  811. if idx, ok := conns.mapping[c.GetID()]; ok {
  812. conn := conns.connections[idx]
  813. conns.removeUserConnection(conn.GetUsername())
  814. if username := c.GetUsername(); username != "" {
  815. if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
  816. if val, ok := conns.perUserConns[username]; ok && val >= maxSessions {
  817. conns.addUserConnection(conn.GetUsername())
  818. return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
  819. }
  820. }
  821. conns.addUserConnection(username)
  822. }
  823. err := conn.CloseFS()
  824. conns.connections[idx] = c
  825. logger.Debug(logSender, c.GetID(), "connection swapped, close fs error: %v", err)
  826. conn = nil
  827. return nil
  828. }
  829. return errors.New("connection to swap not found")
  830. }
  831. // Remove removes a connection from the active ones
  832. func (conns *ActiveConnections) Remove(connectionID string) {
  833. conns.Lock()
  834. defer conns.Unlock()
  835. if idx, ok := conns.mapping[connectionID]; ok {
  836. conn := conns.connections[idx]
  837. err := conn.CloseFS()
  838. lastIdx := len(conns.connections) - 1
  839. conns.connections[idx] = conns.connections[lastIdx]
  840. conns.connections[lastIdx] = nil
  841. conns.connections = conns.connections[:lastIdx]
  842. delete(conns.mapping, connectionID)
  843. if idx != lastIdx {
  844. conns.mapping[conns.connections[idx].GetID()] = idx
  845. }
  846. conns.removeUserConnection(conn.GetUsername())
  847. metric.UpdateActiveConnectionsSize(lastIdx)
  848. logger.Debug(conn.GetProtocol(), conn.GetID(), "connection removed, local address %#v, remote address %#v close fs error: %v, num open connections: %v",
  849. conn.GetLocalAddress(), conn.GetRemoteAddress(), err, lastIdx)
  850. if conn.GetProtocol() == ProtocolFTP && conn.GetUsername() == "" {
  851. ip := util.GetIPFromRemoteAddress(conn.GetRemoteAddress())
  852. logger.ConnectionFailedLog("", ip, dataprovider.LoginMethodNoAuthTryed, conn.GetProtocol(),
  853. dataprovider.ErrNoAuthTryed.Error())
  854. metric.AddNoAuthTryed()
  855. AddDefenderEvent(ip, HostEventNoLoginTried)
  856. dataprovider.ExecutePostLoginHook(&dataprovider.User{}, dataprovider.LoginMethodNoAuthTryed, ip,
  857. conn.GetProtocol(), dataprovider.ErrNoAuthTryed)
  858. }
  859. Config.checkPostDisconnectHook(conn.GetRemoteAddress(), conn.GetProtocol(), conn.GetUsername(),
  860. conn.GetID(), conn.GetConnectionTime())
  861. return
  862. }
  863. logger.Warn(logSender, "", "connection id %q to remove not found!", connectionID)
  864. }
  865. // Close closes an active connection.
  866. // It returns true on success
  867. func (conns *ActiveConnections) Close(connectionID, role string) bool {
  868. conns.RLock()
  869. var result bool
  870. if idx, ok := conns.mapping[connectionID]; ok {
  871. c := conns.connections[idx]
  872. if role == "" || c.GetRole() == role {
  873. defer func(conn ActiveConnection) {
  874. err := conn.Disconnect()
  875. logger.Debug(conn.GetProtocol(), conn.GetID(), "close connection requested, close err: %v", err)
  876. }(c)
  877. result = true
  878. }
  879. }
  880. conns.RUnlock()
  881. return result
  882. }
  883. // AddSSHConnection adds a new ssh connection to the active ones
  884. func (conns *ActiveConnections) AddSSHConnection(c *SSHConnection) {
  885. conns.Lock()
  886. defer conns.Unlock()
  887. conns.sshMapping[c.GetID()] = len(conns.sshConnections)
  888. conns.sshConnections = append(conns.sshConnections, c)
  889. logger.Debug(logSender, c.GetID(), "ssh connection added, num open connections: %d", len(conns.sshConnections))
  890. }
  891. // RemoveSSHConnection removes a connection from the active ones
  892. func (conns *ActiveConnections) RemoveSSHConnection(connectionID string) {
  893. conns.Lock()
  894. defer conns.Unlock()
  895. if idx, ok := conns.sshMapping[connectionID]; ok {
  896. lastIdx := len(conns.sshConnections) - 1
  897. conns.sshConnections[idx] = conns.sshConnections[lastIdx]
  898. conns.sshConnections[lastIdx] = nil
  899. conns.sshConnections = conns.sshConnections[:lastIdx]
  900. delete(conns.sshMapping, connectionID)
  901. if idx != lastIdx {
  902. conns.sshMapping[conns.sshConnections[idx].GetID()] = idx
  903. }
  904. logger.Debug(logSender, connectionID, "ssh connection removed, num open ssh connections: %d", lastIdx)
  905. return
  906. }
  907. logger.Warn(logSender, "", "ssh connection to remove with id %q not found!", connectionID)
  908. }
  909. func (conns *ActiveConnections) checkIdles() {
  910. conns.RLock()
  911. for _, sshConn := range conns.sshConnections {
  912. idleTime := time.Since(sshConn.GetLastActivity())
  913. if idleTime > Config.idleTimeoutAsDuration {
  914. // we close an SSH connection if it has no active connections associated
  915. idToMatch := fmt.Sprintf("_%s_", sshConn.GetID())
  916. toClose := true
  917. for _, conn := range conns.connections {
  918. if strings.Contains(conn.GetID(), idToMatch) {
  919. if time.Since(conn.GetLastActivity()) <= Config.idleTimeoutAsDuration {
  920. toClose = false
  921. break
  922. }
  923. }
  924. }
  925. if toClose {
  926. defer func(c *SSHConnection) {
  927. err := c.Close()
  928. logger.Debug(logSender, c.GetID(), "close idle SSH connection, idle time: %v, close err: %v",
  929. time.Since(c.GetLastActivity()), err)
  930. }(sshConn)
  931. }
  932. }
  933. }
  934. for _, c := range conns.connections {
  935. idleTime := time.Since(c.GetLastActivity())
  936. isUnauthenticatedFTPUser := (c.GetProtocol() == ProtocolFTP && c.GetUsername() == "")
  937. if idleTime > Config.idleTimeoutAsDuration || (isUnauthenticatedFTPUser && idleTime > Config.idleLoginTimeout) {
  938. defer func(conn ActiveConnection) {
  939. err := conn.Disconnect()
  940. logger.Debug(conn.GetProtocol(), conn.GetID(), "close idle connection, idle time: %v, username: %#v close err: %v",
  941. time.Since(conn.GetLastActivity()), conn.GetUsername(), err)
  942. }(c)
  943. }
  944. }
  945. conns.RUnlock()
  946. }
  947. func (conns *ActiveConnections) checkTransfers() {
  948. if conns.transfersCheckStatus.Load() {
  949. logger.Warn(logSender, "", "the previous transfer check is still running, skipping execution")
  950. return
  951. }
  952. conns.transfersCheckStatus.Store(true)
  953. defer conns.transfersCheckStatus.Store(false)
  954. conns.RLock()
  955. if len(conns.connections) < 2 {
  956. conns.RUnlock()
  957. return
  958. }
  959. var wg sync.WaitGroup
  960. logger.Debug(logSender, "", "start concurrent transfers check")
  961. // update the current size for transfers to monitors
  962. for _, c := range conns.connections {
  963. for _, t := range c.GetTransfers() {
  964. if t.HasSizeLimit {
  965. wg.Add(1)
  966. go func(transfer ConnectionTransfer, connID string) {
  967. defer wg.Done()
  968. transfersChecker.UpdateTransferCurrentSizes(transfer.ULSize, transfer.DLSize, transfer.ID, connID)
  969. }(t, c.GetID())
  970. }
  971. }
  972. }
  973. conns.RUnlock()
  974. logger.Debug(logSender, "", "waiting for the update of the transfers current size")
  975. wg.Wait()
  976. logger.Debug(logSender, "", "getting overquota transfers")
  977. overquotaTransfers := transfersChecker.GetOverquotaTransfers()
  978. logger.Debug(logSender, "", "number of overquota transfers: %v", len(overquotaTransfers))
  979. if len(overquotaTransfers) == 0 {
  980. return
  981. }
  982. conns.RLock()
  983. defer conns.RUnlock()
  984. for _, c := range conns.connections {
  985. for _, overquotaTransfer := range overquotaTransfers {
  986. if c.GetID() == overquotaTransfer.ConnID {
  987. logger.Info(logSender, c.GetID(), "user %#v is overquota, try to close transfer id %v",
  988. c.GetUsername(), overquotaTransfer.TransferID)
  989. var err error
  990. if overquotaTransfer.TransferType == TransferDownload {
  991. err = getReadQuotaExceededError(c.GetProtocol())
  992. } else {
  993. err = getQuotaExceededError(c.GetProtocol())
  994. }
  995. c.SignalTransferClose(overquotaTransfer.TransferID, err)
  996. }
  997. }
  998. }
  999. logger.Debug(logSender, "", "transfers check completed")
  1000. }
  1001. // AddClientConnection stores a new client connection
  1002. func (conns *ActiveConnections) AddClientConnection(ipAddr string) {
  1003. conns.clients.add(ipAddr)
  1004. }
  1005. // RemoveClientConnection removes a disconnected client from the tracked ones
  1006. func (conns *ActiveConnections) RemoveClientConnection(ipAddr string) {
  1007. conns.clients.remove(ipAddr)
  1008. }
  1009. // GetClientConnections returns the total number of client connections
  1010. func (conns *ActiveConnections) GetClientConnections() int32 {
  1011. return conns.clients.getTotal()
  1012. }
  1013. // IsNewConnectionAllowed returns an error if the maximum number of concurrent allowed
  1014. // connections is exceeded or a whitelist is defined and the specified ipAddr is not listed
  1015. // or the service is shutting down
  1016. func (conns *ActiveConnections) IsNewConnectionAllowed(ipAddr string) error {
  1017. if isShuttingDown.Load() {
  1018. return ErrShuttingDown
  1019. }
  1020. if Config.whitelist != nil {
  1021. if !Config.whitelist.isAllowed(ipAddr) {
  1022. return ErrConnectionDenied
  1023. }
  1024. }
  1025. if Config.MaxTotalConnections == 0 && Config.MaxPerHostConnections == 0 {
  1026. return nil
  1027. }
  1028. if Config.MaxPerHostConnections > 0 {
  1029. if total := conns.clients.getTotalFrom(ipAddr); total > Config.MaxPerHostConnections {
  1030. logger.Info(logSender, "", "active connections from %s %d/%d", ipAddr, total, Config.MaxPerHostConnections)
  1031. AddDefenderEvent(ipAddr, HostEventLimitExceeded)
  1032. return ErrConnectionDenied
  1033. }
  1034. }
  1035. if Config.MaxTotalConnections > 0 {
  1036. if total := conns.clients.getTotal(); total > int32(Config.MaxTotalConnections) {
  1037. logger.Info(logSender, "", "active client connections %d/%d", total, Config.MaxTotalConnections)
  1038. return ErrConnectionDenied
  1039. }
  1040. // on a single SFTP connection we could have multiple SFTP channels or commands
  1041. // so we check the estabilished connections too
  1042. conns.RLock()
  1043. defer conns.RUnlock()
  1044. if sess := len(conns.connections); sess >= Config.MaxTotalConnections {
  1045. logger.Info(logSender, "", "active client sessions %d/%d", sess, Config.MaxTotalConnections)
  1046. return ErrConnectionDenied
  1047. }
  1048. }
  1049. return nil
  1050. }
  1051. // GetStats returns stats for active connections
  1052. func (conns *ActiveConnections) GetStats(role string) []ConnectionStatus {
  1053. conns.RLock()
  1054. defer conns.RUnlock()
  1055. stats := make([]ConnectionStatus, 0, len(conns.connections))
  1056. node := dataprovider.GetNodeName()
  1057. for _, c := range conns.connections {
  1058. if role == "" || c.GetRole() == role {
  1059. stat := ConnectionStatus{
  1060. Username: c.GetUsername(),
  1061. ConnectionID: c.GetID(),
  1062. ClientVersion: c.GetClientVersion(),
  1063. RemoteAddress: c.GetRemoteAddress(),
  1064. ConnectionTime: util.GetTimeAsMsSinceEpoch(c.GetConnectionTime()),
  1065. LastActivity: util.GetTimeAsMsSinceEpoch(c.GetLastActivity()),
  1066. Protocol: c.GetProtocol(),
  1067. Command: c.GetCommand(),
  1068. Transfers: c.GetTransfers(),
  1069. Node: node,
  1070. }
  1071. stats = append(stats, stat)
  1072. }
  1073. }
  1074. return stats
  1075. }
  1076. // ConnectionStatus returns the status for an active connection
  1077. type ConnectionStatus struct {
  1078. // Logged in username
  1079. Username string `json:"username"`
  1080. // Unique identifier for the connection
  1081. ConnectionID string `json:"connection_id"`
  1082. // client's version string
  1083. ClientVersion string `json:"client_version,omitempty"`
  1084. // Remote address for this connection
  1085. RemoteAddress string `json:"remote_address"`
  1086. // Connection time as unix timestamp in milliseconds
  1087. ConnectionTime int64 `json:"connection_time"`
  1088. // Last activity as unix timestamp in milliseconds
  1089. LastActivity int64 `json:"last_activity"`
  1090. // Protocol for this connection
  1091. Protocol string `json:"protocol"`
  1092. // active uploads/downloads
  1093. Transfers []ConnectionTransfer `json:"active_transfers,omitempty"`
  1094. // SSH command or WebDAV method
  1095. Command string `json:"command,omitempty"`
  1096. // Node identifier, omitted for single node installations
  1097. Node string `json:"node,omitempty"`
  1098. }
  1099. // GetConnectionDuration returns the connection duration as string
  1100. func (c *ConnectionStatus) GetConnectionDuration() string {
  1101. elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(c.ConnectionTime))
  1102. return util.GetDurationAsString(elapsed)
  1103. }
  1104. // GetConnectionInfo returns connection info.
  1105. // Protocol,Client Version and RemoteAddress are returned.
  1106. func (c *ConnectionStatus) GetConnectionInfo() string {
  1107. var result strings.Builder
  1108. result.WriteString(fmt.Sprintf("%v. Client: %#v From: %#v", c.Protocol, c.ClientVersion, c.RemoteAddress))
  1109. if c.Command == "" {
  1110. return result.String()
  1111. }
  1112. switch c.Protocol {
  1113. case ProtocolSSH, ProtocolFTP:
  1114. result.WriteString(fmt.Sprintf(". Command: %#v", c.Command))
  1115. case ProtocolWebDAV:
  1116. result.WriteString(fmt.Sprintf(". Method: %#v", c.Command))
  1117. }
  1118. return result.String()
  1119. }
  1120. // GetTransfersAsString returns the active transfers as string
  1121. func (c *ConnectionStatus) GetTransfersAsString() string {
  1122. result := ""
  1123. for _, t := range c.Transfers {
  1124. if result != "" {
  1125. result += ". "
  1126. }
  1127. result += t.getConnectionTransferAsString()
  1128. }
  1129. return result
  1130. }
  1131. // ActiveQuotaScan defines an active quota scan for a user
  1132. type ActiveQuotaScan struct {
  1133. // Username to which the quota scan refers
  1134. Username string `json:"username"`
  1135. // quota scan start time as unix timestamp in milliseconds
  1136. StartTime int64 `json:"start_time"`
  1137. Role string `json:"-"`
  1138. }
  1139. // ActiveVirtualFolderQuotaScan defines an active quota scan for a virtual folder
  1140. type ActiveVirtualFolderQuotaScan struct {
  1141. // folder name to which the quota scan refers
  1142. Name string `json:"name"`
  1143. // quota scan start time as unix timestamp in milliseconds
  1144. StartTime int64 `json:"start_time"`
  1145. }
  1146. // ActiveScans holds the active quota scans
  1147. type ActiveScans struct {
  1148. sync.RWMutex
  1149. UserScans []ActiveQuotaScan
  1150. FolderScans []ActiveVirtualFolderQuotaScan
  1151. }
  1152. // GetUsersQuotaScans returns the active users quota scans
  1153. func (s *ActiveScans) GetUsersQuotaScans(role string) []ActiveQuotaScan {
  1154. s.RLock()
  1155. defer s.RUnlock()
  1156. scans := make([]ActiveQuotaScan, 0, len(s.UserScans))
  1157. for _, scan := range s.UserScans {
  1158. if role == "" || role == scan.Role {
  1159. scans = append(scans, ActiveQuotaScan{
  1160. Username: scan.Username,
  1161. StartTime: scan.StartTime,
  1162. })
  1163. }
  1164. }
  1165. return scans
  1166. }
  1167. // AddUserQuotaScan adds a user to the ones with active quota scans.
  1168. // Returns false if the user has a quota scan already running
  1169. func (s *ActiveScans) AddUserQuotaScan(username, role string) bool {
  1170. s.Lock()
  1171. defer s.Unlock()
  1172. for _, scan := range s.UserScans {
  1173. if scan.Username == username {
  1174. return false
  1175. }
  1176. }
  1177. s.UserScans = append(s.UserScans, ActiveQuotaScan{
  1178. Username: username,
  1179. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1180. Role: role,
  1181. })
  1182. return true
  1183. }
  1184. // RemoveUserQuotaScan removes a user from the ones with active quota scans.
  1185. // Returns false if the user has no active quota scans
  1186. func (s *ActiveScans) RemoveUserQuotaScan(username string) bool {
  1187. s.Lock()
  1188. defer s.Unlock()
  1189. for idx, scan := range s.UserScans {
  1190. if scan.Username == username {
  1191. lastIdx := len(s.UserScans) - 1
  1192. s.UserScans[idx] = s.UserScans[lastIdx]
  1193. s.UserScans = s.UserScans[:lastIdx]
  1194. return true
  1195. }
  1196. }
  1197. return false
  1198. }
  1199. // GetVFoldersQuotaScans returns the active quota scans for virtual folders
  1200. func (s *ActiveScans) GetVFoldersQuotaScans() []ActiveVirtualFolderQuotaScan {
  1201. s.RLock()
  1202. defer s.RUnlock()
  1203. scans := make([]ActiveVirtualFolderQuotaScan, len(s.FolderScans))
  1204. copy(scans, s.FolderScans)
  1205. return scans
  1206. }
  1207. // AddVFolderQuotaScan adds a virtual folder to the ones with active quota scans.
  1208. // Returns false if the folder has a quota scan already running
  1209. func (s *ActiveScans) AddVFolderQuotaScan(folderName string) bool {
  1210. s.Lock()
  1211. defer s.Unlock()
  1212. for _, scan := range s.FolderScans {
  1213. if scan.Name == folderName {
  1214. return false
  1215. }
  1216. }
  1217. s.FolderScans = append(s.FolderScans, ActiveVirtualFolderQuotaScan{
  1218. Name: folderName,
  1219. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1220. })
  1221. return true
  1222. }
  1223. // RemoveVFolderQuotaScan removes a folder from the ones with active quota scans.
  1224. // Returns false if the folder has no active quota scans
  1225. func (s *ActiveScans) RemoveVFolderQuotaScan(folderName string) bool {
  1226. s.Lock()
  1227. defer s.Unlock()
  1228. for idx, scan := range s.FolderScans {
  1229. if scan.Name == folderName {
  1230. lastIdx := len(s.FolderScans) - 1
  1231. s.FolderScans[idx] = s.FolderScans[lastIdx]
  1232. s.FolderScans = s.FolderScans[:lastIdx]
  1233. return true
  1234. }
  1235. }
  1236. return false
  1237. }
  1238. // MetadataCheck defines an active metadata check
  1239. type MetadataCheck struct {
  1240. // Username to which the metadata check refers
  1241. Username string `json:"username"`
  1242. // check start time as unix timestamp in milliseconds
  1243. StartTime int64 `json:"start_time"`
  1244. Role string `json:"-"`
  1245. }
  1246. // MetadataChecks holds the active metadata checks
  1247. type MetadataChecks struct {
  1248. sync.RWMutex
  1249. checks []MetadataCheck
  1250. }
  1251. // Get returns the active metadata checks
  1252. func (c *MetadataChecks) Get(role string) []MetadataCheck {
  1253. c.RLock()
  1254. defer c.RUnlock()
  1255. checks := make([]MetadataCheck, 0, len(c.checks))
  1256. for _, check := range c.checks {
  1257. if role == "" || role == check.Role {
  1258. checks = append(checks, MetadataCheck{
  1259. Username: check.Username,
  1260. StartTime: check.StartTime,
  1261. })
  1262. }
  1263. }
  1264. return checks
  1265. }
  1266. // Add adds a user to the ones with active metadata checks.
  1267. // Return false if a metadata check is already active for the specified user
  1268. func (c *MetadataChecks) Add(username, role string) bool {
  1269. c.Lock()
  1270. defer c.Unlock()
  1271. for idx := range c.checks {
  1272. if c.checks[idx].Username == username {
  1273. return false
  1274. }
  1275. }
  1276. c.checks = append(c.checks, MetadataCheck{
  1277. Username: username,
  1278. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1279. Role: role,
  1280. })
  1281. return true
  1282. }
  1283. // Remove removes a user from the ones with active metadata checks
  1284. func (c *MetadataChecks) Remove(username string) bool {
  1285. c.Lock()
  1286. defer c.Unlock()
  1287. for idx := range c.checks {
  1288. if c.checks[idx].Username == username {
  1289. lastIdx := len(c.checks) - 1
  1290. c.checks[idx] = c.checks[lastIdx]
  1291. c.checks = c.checks[:lastIdx]
  1292. return true
  1293. }
  1294. }
  1295. return false
  1296. }