common.go 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package common defines code shared among file transfer packages and protocols
  15. package common
  16. import (
  17. "context"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "os/exec"
  25. "path/filepath"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/pires/go-proxyproto"
  32. "github.com/sftpgo/sdk/plugin/notifier"
  33. "github.com/drakkan/sftpgo/v2/internal/command"
  34. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  35. "github.com/drakkan/sftpgo/v2/internal/httpclient"
  36. "github.com/drakkan/sftpgo/v2/internal/logger"
  37. "github.com/drakkan/sftpgo/v2/internal/metric"
  38. "github.com/drakkan/sftpgo/v2/internal/plugin"
  39. "github.com/drakkan/sftpgo/v2/internal/smtp"
  40. "github.com/drakkan/sftpgo/v2/internal/util"
  41. "github.com/drakkan/sftpgo/v2/internal/vfs"
  42. )
  43. // constants
  44. const (
  45. logSender = "common"
  46. uploadLogSender = "Upload"
  47. downloadLogSender = "Download"
  48. renameLogSender = "Rename"
  49. rmdirLogSender = "Rmdir"
  50. mkdirLogSender = "Mkdir"
  51. symlinkLogSender = "Symlink"
  52. removeLogSender = "Remove"
  53. chownLogSender = "Chown"
  54. chmodLogSender = "Chmod"
  55. chtimesLogSender = "Chtimes"
  56. copyLogSender = "Copy"
  57. truncateLogSender = "Truncate"
  58. operationDownload = "download"
  59. operationUpload = "upload"
  60. operationFirstDownload = "first-download"
  61. operationFirstUpload = "first-upload"
  62. operationDelete = "delete"
  63. operationCopy = "copy"
  64. // Pre-download action name
  65. OperationPreDownload = "pre-download"
  66. // Pre-upload action name
  67. OperationPreUpload = "pre-upload"
  68. operationPreDelete = "pre-delete"
  69. operationRename = "rename"
  70. operationMkdir = "mkdir"
  71. operationRmdir = "rmdir"
  72. // SSH command action name
  73. OperationSSHCmd = "ssh_cmd"
  74. chtimesFormat = "2006-01-02T15:04:05" // YYYY-MM-DDTHH:MM:SS
  75. idleTimeoutCheckInterval = 3 * time.Minute
  76. periodicTimeoutCheckInterval = 1 * time.Minute
  77. )
  78. // Stat flags
  79. const (
  80. StatAttrUIDGID = 1
  81. StatAttrPerms = 2
  82. StatAttrTimes = 4
  83. StatAttrSize = 8
  84. )
  85. // Transfer types
  86. const (
  87. TransferUpload = iota
  88. TransferDownload
  89. )
  90. // Supported protocols
  91. const (
  92. ProtocolSFTP = "SFTP"
  93. ProtocolSCP = "SCP"
  94. ProtocolSSH = "SSH"
  95. ProtocolFTP = "FTP"
  96. ProtocolWebDAV = "DAV"
  97. ProtocolHTTP = "HTTP"
  98. ProtocolHTTPShare = "HTTPShare"
  99. ProtocolDataRetention = "DataRetention"
  100. ProtocolOIDC = "OIDC"
  101. protocolEventAction = "EventAction"
  102. )
  103. // Upload modes
  104. const (
  105. UploadModeStandard = iota
  106. UploadModeAtomic
  107. UploadModeAtomicWithResume
  108. )
  109. func init() {
  110. Connections.clients = clientsMap{
  111. clients: make(map[string]int),
  112. }
  113. Connections.perUserConns = make(map[string]int)
  114. Connections.mapping = make(map[string]int)
  115. Connections.sshMapping = make(map[string]int)
  116. }
  117. // errors definitions
  118. var (
  119. ErrPermissionDenied = errors.New("permission denied")
  120. ErrNotExist = errors.New("no such file or directory")
  121. ErrOpUnsupported = errors.New("operation unsupported")
  122. ErrGenericFailure = errors.New("failure")
  123. ErrQuotaExceeded = errors.New("denying write due to space limit")
  124. ErrReadQuotaExceeded = errors.New("denying read due to quota limit")
  125. ErrConnectionDenied = errors.New("you are not allowed to connect")
  126. ErrNoBinding = errors.New("no binding configured")
  127. ErrCrtRevoked = errors.New("your certificate has been revoked")
  128. ErrNoCredentials = errors.New("no credential provided")
  129. ErrInternalFailure = errors.New("internal failure")
  130. ErrTransferAborted = errors.New("transfer aborted")
  131. ErrShuttingDown = errors.New("the service is shutting down")
  132. errNoTransfer = errors.New("requested transfer not found")
  133. errTransferMismatch = errors.New("transfer mismatch")
  134. )
  135. var (
  136. // Config is the configuration for the supported protocols
  137. Config Configuration
  138. // Connections is the list of active connections
  139. Connections ActiveConnections
  140. // QuotaScans is the list of active quota scans
  141. QuotaScans ActiveScans
  142. // ActiveMetadataChecks holds the active metadata checks
  143. ActiveMetadataChecks MetadataChecks
  144. transfersChecker TransfersChecker
  145. supportedProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP, ProtocolWebDAV,
  146. ProtocolHTTP, ProtocolHTTPShare, ProtocolOIDC}
  147. disconnHookProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP}
  148. // the map key is the protocol, for each protocol we can have multiple rate limiters
  149. rateLimiters map[string][]*rateLimiter
  150. isShuttingDown atomic.Bool
  151. ftpLoginCommands = []string{"PASS", "USER"}
  152. )
  153. // Initialize sets the common configuration
  154. func Initialize(c Configuration, isShared int) error {
  155. isShuttingDown.Store(false)
  156. util.SetUmask(c.Umask)
  157. Config = c
  158. Config.Actions.ExecuteOn = util.RemoveDuplicates(Config.Actions.ExecuteOn, true)
  159. Config.Actions.ExecuteSync = util.RemoveDuplicates(Config.Actions.ExecuteSync, true)
  160. Config.ProxyAllowed = util.RemoveDuplicates(Config.ProxyAllowed, true)
  161. Config.idleLoginTimeout = 2 * time.Minute
  162. Config.idleTimeoutAsDuration = time.Duration(Config.IdleTimeout) * time.Minute
  163. startPeriodicChecks(periodicTimeoutCheckInterval, isShared)
  164. Config.defender = nil
  165. Config.allowList = nil
  166. Config.rateLimitersList = nil
  167. rateLimiters = make(map[string][]*rateLimiter)
  168. for _, rlCfg := range c.RateLimitersConfig {
  169. if rlCfg.isEnabled() {
  170. if err := rlCfg.validate(); err != nil {
  171. return fmt.Errorf("rate limiters initialization error: %w", err)
  172. }
  173. rateLimiter := rlCfg.getLimiter()
  174. for _, protocol := range rlCfg.Protocols {
  175. rateLimiters[protocol] = append(rateLimiters[protocol], rateLimiter)
  176. }
  177. }
  178. }
  179. if len(rateLimiters) > 0 {
  180. rateLimitersList, err := dataprovider.NewIPList(dataprovider.IPListTypeRateLimiterSafeList)
  181. if err != nil {
  182. return fmt.Errorf("unable to initialize ratelimiters list: %w", err)
  183. }
  184. Config.rateLimitersList = rateLimitersList
  185. }
  186. if c.DefenderConfig.Enabled {
  187. if !util.Contains(supportedDefenderDrivers, c.DefenderConfig.Driver) {
  188. return fmt.Errorf("unsupported defender driver %q", c.DefenderConfig.Driver)
  189. }
  190. var defender Defender
  191. var err error
  192. switch c.DefenderConfig.Driver {
  193. case DefenderDriverProvider:
  194. defender, err = newDBDefender(&c.DefenderConfig)
  195. default:
  196. defender, err = newInMemoryDefender(&c.DefenderConfig)
  197. }
  198. if err != nil {
  199. return fmt.Errorf("defender initialization error: %v", err)
  200. }
  201. logger.Info(logSender, "", "defender initialized with config %+v", c.DefenderConfig)
  202. Config.defender = defender
  203. }
  204. if c.AllowListStatus > 0 {
  205. allowList, err := dataprovider.NewIPList(dataprovider.IPListTypeAllowList)
  206. if err != nil {
  207. return fmt.Errorf("unable to initialize the allow list: %w", err)
  208. }
  209. logger.Info(logSender, "", "allow list initialized")
  210. Config.allowList = allowList
  211. }
  212. if err := c.initializeProxyProtocol(); err != nil {
  213. return err
  214. }
  215. vfs.SetTempPath(c.TempPath)
  216. dataprovider.SetTempPath(c.TempPath)
  217. vfs.SetAllowSelfConnections(c.AllowSelfConnections)
  218. vfs.SetRenameMode(c.RenameMode)
  219. vfs.SetReadMetadataMode(c.Metadata.Read)
  220. vfs.SetResumeMaxSize(c.ResumeMaxSize)
  221. dataprovider.SetAllowSelfConnections(c.AllowSelfConnections)
  222. transfersChecker = getTransfersChecker(isShared)
  223. return nil
  224. }
  225. // CheckClosing returns an error if the service is closing
  226. func CheckClosing() error {
  227. if isShuttingDown.Load() {
  228. return ErrShuttingDown
  229. }
  230. return nil
  231. }
  232. // WaitForTransfers waits, for the specified grace time, for currently ongoing
  233. // client-initiated transfer sessions to completes.
  234. // A zero graceTime means no wait
  235. func WaitForTransfers(graceTime int) {
  236. if graceTime == 0 {
  237. return
  238. }
  239. if isShuttingDown.Swap(true) {
  240. return
  241. }
  242. if activeHooks.Load() == 0 && getActiveConnections() == 0 {
  243. return
  244. }
  245. graceTimer := time.NewTimer(time.Duration(graceTime) * time.Second)
  246. ticker := time.NewTicker(3 * time.Second)
  247. for {
  248. select {
  249. case <-ticker.C:
  250. hooks := activeHooks.Load()
  251. logger.Info(logSender, "", "active hooks: %d", hooks)
  252. if hooks == 0 && getActiveConnections() == 0 {
  253. logger.Info(logSender, "", "no more active connections, graceful shutdown")
  254. ticker.Stop()
  255. graceTimer.Stop()
  256. return
  257. }
  258. case <-graceTimer.C:
  259. logger.Info(logSender, "", "grace time expired, hard shutdown")
  260. ticker.Stop()
  261. return
  262. }
  263. }
  264. }
  265. // getActiveConnections returns the number of connections with active transfers
  266. func getActiveConnections() int {
  267. var activeConns int
  268. Connections.RLock()
  269. for _, c := range Connections.connections {
  270. if len(c.GetTransfers()) > 0 {
  271. activeConns++
  272. }
  273. }
  274. Connections.RUnlock()
  275. logger.Info(logSender, "", "number of connections with active transfers: %d", activeConns)
  276. return activeConns
  277. }
  278. // LimitRate blocks until all the configured rate limiters
  279. // allow one event to happen.
  280. // It returns an error if the time to wait exceeds the max
  281. // allowed delay
  282. func LimitRate(protocol, ip string) (time.Duration, error) {
  283. if Config.rateLimitersList != nil {
  284. isListed, _, err := Config.rateLimitersList.IsListed(ip, protocol)
  285. if err == nil && isListed {
  286. return 0, nil
  287. }
  288. }
  289. for _, limiter := range rateLimiters[protocol] {
  290. if delay, err := limiter.Wait(ip, protocol); err != nil {
  291. logger.Debug(logSender, "", "protocol %s ip %s: %v", protocol, ip, err)
  292. return delay, err
  293. }
  294. }
  295. return 0, nil
  296. }
  297. // Reload reloads the whitelist, the IP filter plugin and the defender's block and safe lists
  298. func Reload() error {
  299. plugin.Handler.ReloadFilter()
  300. return nil
  301. }
  302. // IsBanned returns true if the specified IP address is banned
  303. func IsBanned(ip, protocol string) bool {
  304. if plugin.Handler.IsIPBanned(ip, protocol) {
  305. return true
  306. }
  307. if Config.defender == nil {
  308. return false
  309. }
  310. return Config.defender.IsBanned(ip, protocol)
  311. }
  312. // GetDefenderBanTime returns the ban time for the given IP
  313. // or nil if the IP is not banned or the defender is disabled
  314. func GetDefenderBanTime(ip string) (*time.Time, error) {
  315. if Config.defender == nil {
  316. return nil, nil
  317. }
  318. return Config.defender.GetBanTime(ip)
  319. }
  320. // GetDefenderHosts returns hosts that are banned or for which some violations have been detected
  321. func GetDefenderHosts() ([]dataprovider.DefenderEntry, error) {
  322. if Config.defender == nil {
  323. return nil, nil
  324. }
  325. return Config.defender.GetHosts()
  326. }
  327. // GetDefenderHost returns a defender host by ip, if any
  328. func GetDefenderHost(ip string) (dataprovider.DefenderEntry, error) {
  329. if Config.defender == nil {
  330. return dataprovider.DefenderEntry{}, errors.New("defender is disabled")
  331. }
  332. return Config.defender.GetHost(ip)
  333. }
  334. // DeleteDefenderHost removes the specified IP address from the defender lists
  335. func DeleteDefenderHost(ip string) bool {
  336. if Config.defender == nil {
  337. return false
  338. }
  339. return Config.defender.DeleteHost(ip)
  340. }
  341. // GetDefenderScore returns the score for the given IP
  342. func GetDefenderScore(ip string) (int, error) {
  343. if Config.defender == nil {
  344. return 0, nil
  345. }
  346. return Config.defender.GetScore(ip)
  347. }
  348. // AddDefenderEvent adds the specified defender event for the given IP
  349. func AddDefenderEvent(ip, protocol string, event HostEvent) {
  350. if Config.defender == nil {
  351. return
  352. }
  353. Config.defender.AddEvent(ip, protocol, event)
  354. }
  355. func startPeriodicChecks(duration time.Duration, isShared int) {
  356. startEventScheduler()
  357. spec := fmt.Sprintf("@every %s", duration)
  358. _, err := eventScheduler.AddFunc(spec, Connections.checkTransfers)
  359. util.PanicOnError(err)
  360. logger.Info(logSender, "", "scheduled overquota transfers check, schedule %q", spec)
  361. if isShared == 1 {
  362. logger.Info(logSender, "", "add reload configs task")
  363. _, err := eventScheduler.AddFunc("@every 10m", smtp.ReloadProviderConf)
  364. util.PanicOnError(err)
  365. }
  366. if Config.IdleTimeout > 0 {
  367. ratio := idleTimeoutCheckInterval / periodicTimeoutCheckInterval
  368. spec = fmt.Sprintf("@every %s", duration*ratio)
  369. _, err = eventScheduler.AddFunc(spec, Connections.checkIdles)
  370. util.PanicOnError(err)
  371. logger.Info(logSender, "", "scheduled idle connections check, schedule %q", spec)
  372. }
  373. }
  374. // ActiveTransfer defines the interface for the current active transfers
  375. type ActiveTransfer interface {
  376. GetID() int64
  377. GetType() int
  378. GetSize() int64
  379. GetDownloadedSize() int64
  380. GetUploadedSize() int64
  381. GetVirtualPath() string
  382. GetStartTime() time.Time
  383. SignalClose(err error)
  384. Truncate(fsPath string, size int64) (int64, error)
  385. GetRealFsPath(fsPath string) string
  386. SetTimes(fsPath string, atime time.Time, mtime time.Time) bool
  387. GetTruncatedSize() int64
  388. HasSizeLimit() bool
  389. }
  390. // ActiveConnection defines the interface for the current active connections
  391. type ActiveConnection interface {
  392. GetID() string
  393. GetUsername() string
  394. GetRole() string
  395. GetMaxSessions() int
  396. GetLocalAddress() string
  397. GetRemoteAddress() string
  398. GetClientVersion() string
  399. GetProtocol() string
  400. GetConnectionTime() time.Time
  401. GetLastActivity() time.Time
  402. GetCommand() string
  403. Disconnect() error
  404. AddTransfer(t ActiveTransfer)
  405. RemoveTransfer(t ActiveTransfer)
  406. GetTransfers() []ConnectionTransfer
  407. SignalTransferClose(transferID int64, err error)
  408. CloseFS() error
  409. }
  410. // StatAttributes defines the attributes for set stat commands
  411. type StatAttributes struct {
  412. Mode os.FileMode
  413. Atime time.Time
  414. Mtime time.Time
  415. UID int
  416. GID int
  417. Flags int
  418. Size int64
  419. }
  420. // ConnectionTransfer defines the trasfer details
  421. type ConnectionTransfer struct {
  422. ID int64 `json:"-"`
  423. OperationType string `json:"operation_type"`
  424. StartTime int64 `json:"start_time"`
  425. Size int64 `json:"size"`
  426. VirtualPath string `json:"path"`
  427. HasSizeLimit bool `json:"-"`
  428. ULSize int64 `json:"-"`
  429. DLSize int64 `json:"-"`
  430. }
  431. func (t *ConnectionTransfer) getConnectionTransferAsString() string {
  432. result := ""
  433. switch t.OperationType {
  434. case operationUpload:
  435. result += "UL "
  436. case operationDownload:
  437. result += "DL "
  438. }
  439. result += fmt.Sprintf("%q ", t.VirtualPath)
  440. if t.Size > 0 {
  441. elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(t.StartTime))
  442. speed := float64(t.Size) / float64(util.GetTimeAsMsSinceEpoch(time.Now())-t.StartTime)
  443. result += fmt.Sprintf("Size: %s Elapsed: %s Speed: \"%.1f KB/s\"", util.ByteCountIEC(t.Size),
  444. util.GetDurationAsString(elapsed), speed)
  445. }
  446. return result
  447. }
  448. // MetadataConfig defines how to handle metadata for cloud storage backends
  449. type MetadataConfig struct {
  450. // If not zero the metadata will be read before downloads and will be
  451. // available in notifications
  452. Read int `json:"read" mapstructure:"read"`
  453. }
  454. // Configuration defines configuration parameters common to all supported protocols
  455. type Configuration struct {
  456. // Maximum idle timeout as minutes. If a client is idle for a time that exceeds this setting it will be disconnected.
  457. // 0 means disabled
  458. IdleTimeout int `json:"idle_timeout" mapstructure:"idle_timeout"`
  459. // UploadMode 0 means standard, the files are uploaded directly to the requested path.
  460. // 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path
  461. // when the client ends the upload. Atomic mode avoid problems such as a web server that
  462. // serves partial files when the files are being uploaded.
  463. // In atomic mode if there is an upload error the temporary file is deleted and so the requested
  464. // upload path will not contain a partial file.
  465. // 2 means atomic with resume support: as atomic but if there is an upload error the temporary
  466. // file is renamed to the requested path and not deleted, this way a client can reconnect and resume
  467. // the upload.
  468. UploadMode int `json:"upload_mode" mapstructure:"upload_mode"`
  469. // Actions to execute for SFTP file operations and SSH commands
  470. Actions ProtocolActions `json:"actions" mapstructure:"actions"`
  471. // SetstatMode 0 means "normal mode": requests for changing permissions and owner/group are executed.
  472. // 1 means "ignore mode": requests for changing permissions and owner/group are silently ignored.
  473. // 2 means "ignore mode for cloud fs": requests for changing permissions and owner/group are
  474. // silently ignored for cloud based filesystem such as S3, GCS, Azure Blob. Requests for changing
  475. // modification times are ignored for cloud based filesystem if they are not supported.
  476. SetstatMode int `json:"setstat_mode" mapstructure:"setstat_mode"`
  477. // RenameMode defines how to handle directory renames. By default, renaming of non-empty directories
  478. // is not allowed for cloud storage providers (S3, GCS, Azure Blob). Set to 1 to enable recursive
  479. // renames for these providers, they may be slow, there is no atomic rename API like for local
  480. // filesystem, so SFTPGo will recursively list the directory contents and do a rename for each entry
  481. RenameMode int `json:"rename_mode" mapstructure:"rename_mode"`
  482. // ResumeMaxSize defines the maximum size allowed, in bytes, to resume uploads on storage backends
  483. // with immutable objects. By default, resuming uploads is not allowed for cloud storage providers
  484. // (S3, GCS, Azure Blob) because SFTPGo must rewrite the entire file.
  485. // Set to a value greater than 0 to allow resuming uploads of files smaller than or equal to the
  486. // defined size.
  487. ResumeMaxSize int64 `json:"resume_max_size" mapstructure:"resume_max_size"`
  488. // TempPath defines the path for temporary files such as those used for atomic uploads or file pipes.
  489. // If you set this option you must make sure that the defined path exists, is accessible for writing
  490. // by the user running SFTPGo, and is on the same filesystem as the users home directories otherwise
  491. // the renaming for atomic uploads will become a copy and therefore may take a long time.
  492. // The temporary files are not namespaced. The default is generally fine. Leave empty for the default.
  493. TempPath string `json:"temp_path" mapstructure:"temp_path"`
  494. // Support for HAProxy PROXY protocol.
  495. // If you are running SFTPGo behind a proxy server such as HAProxy, AWS ELB or NGNIX, you can enable
  496. // the proxy protocol. It provides a convenient way to safely transport connection information
  497. // such as a client's address across multiple layers of NAT or TCP proxies to get the real
  498. // client IP address instead of the proxy IP. Both protocol versions 1 and 2 are supported.
  499. // - 0 means disabled
  500. // - 1 means proxy protocol enabled. Proxy header will be used and requests without proxy header will be accepted.
  501. // - 2 means proxy protocol required. Proxy header will be used and requests without proxy header will be rejected.
  502. // If the proxy protocol is enabled in SFTPGo then you have to enable the protocol in your proxy configuration too,
  503. // for example for HAProxy add "send-proxy" or "send-proxy-v2" to each server configuration line.
  504. ProxyProtocol int `json:"proxy_protocol" mapstructure:"proxy_protocol"`
  505. // List of IP addresses and IP ranges allowed to send the proxy header.
  506. // If proxy protocol is set to 1 and we receive a proxy header from an IP that is not in the list then the
  507. // connection will be accepted and the header will be ignored.
  508. // If proxy protocol is set to 2 and we receive a proxy header from an IP that is not in the list then the
  509. // connection will be rejected.
  510. ProxyAllowed []string `json:"proxy_allowed" mapstructure:"proxy_allowed"`
  511. // List of IP addresses and IP ranges for which not to read the proxy header
  512. ProxySkipped []string `json:"proxy_skipped" mapstructure:"proxy_skipped"`
  513. // Absolute path to an external program or an HTTP URL to invoke as soon as SFTPGo starts.
  514. // If you define an HTTP URL it will be invoked using a `GET` request.
  515. // Please note that SFTPGo services may not yet be available when this hook is run.
  516. // Leave empty do disable.
  517. StartupHook string `json:"startup_hook" mapstructure:"startup_hook"`
  518. // Absolute path to an external program or an HTTP URL to invoke after a user connects
  519. // and before he tries to login. It allows you to reject the connection based on the source
  520. // ip address. Leave empty do disable.
  521. PostConnectHook string `json:"post_connect_hook" mapstructure:"post_connect_hook"`
  522. // Absolute path to an external program or an HTTP URL to invoke after an SSH/FTP connection ends.
  523. // Leave empty do disable.
  524. PostDisconnectHook string `json:"post_disconnect_hook" mapstructure:"post_disconnect_hook"`
  525. // Absolute path to an external program or an HTTP URL to invoke after a data retention check completes.
  526. // Leave empty do disable.
  527. DataRetentionHook string `json:"data_retention_hook" mapstructure:"data_retention_hook"`
  528. // Maximum number of concurrent client connections. 0 means unlimited
  529. MaxTotalConnections int `json:"max_total_connections" mapstructure:"max_total_connections"`
  530. // Maximum number of concurrent client connections from the same host (IP). 0 means unlimited
  531. MaxPerHostConnections int `json:"max_per_host_connections" mapstructure:"max_per_host_connections"`
  532. // Defines the status of the global allow list. 0 means disabled, 1 enabled.
  533. // If enabled, only the listed IPs/networks can access the configured services, all other
  534. // client connections will be dropped before they even try to authenticate.
  535. // Ensure to enable this setting only after adding some allowed ip/networks from the WebAdmin/REST API
  536. AllowListStatus int `json:"allowlist_status" mapstructure:"allowlist_status"`
  537. // Allow users on this instance to use other users/virtual folders on this instance as storage backend.
  538. // Enable this setting if you know what you are doing.
  539. AllowSelfConnections int `json:"allow_self_connections" mapstructure:"allow_self_connections"`
  540. // Defender configuration
  541. DefenderConfig DefenderConfig `json:"defender" mapstructure:"defender"`
  542. // Rate limiter configurations
  543. RateLimitersConfig []RateLimiterConfig `json:"rate_limiters" mapstructure:"rate_limiters"`
  544. // Umask for new uploads. Leave blank to use the system default.
  545. Umask string `json:"umask" mapstructure:"umask"`
  546. // Metadata configuration
  547. Metadata MetadataConfig `json:"metadata" mapstructure:"metadata"`
  548. idleTimeoutAsDuration time.Duration
  549. idleLoginTimeout time.Duration
  550. defender Defender
  551. allowList *dataprovider.IPList
  552. rateLimitersList *dataprovider.IPList
  553. proxyAllowed []func(net.IP) bool
  554. proxySkipped []func(net.IP) bool
  555. }
  556. // IsAtomicUploadEnabled returns true if atomic upload is enabled
  557. func (c *Configuration) IsAtomicUploadEnabled() bool {
  558. return c.UploadMode == UploadModeAtomic || c.UploadMode == UploadModeAtomicWithResume
  559. }
  560. func (c *Configuration) initializeProxyProtocol() error {
  561. if c.ProxyProtocol > 0 {
  562. allowed, err := util.ParseAllowedIPAndRanges(c.ProxyAllowed)
  563. if err != nil {
  564. return fmt.Errorf("invalid proxy allowed: %w", err)
  565. }
  566. skipped, err := util.ParseAllowedIPAndRanges(c.ProxySkipped)
  567. if err != nil {
  568. return fmt.Errorf("invalid proxy skipped: %w", err)
  569. }
  570. Config.proxyAllowed = allowed
  571. Config.proxySkipped = skipped
  572. }
  573. return nil
  574. }
  575. // GetProxyListener returns a wrapper for the given listener that supports the
  576. // HAProxy Proxy Protocol
  577. func (c *Configuration) GetProxyListener(listener net.Listener) (*proxyproto.Listener, error) {
  578. if c.ProxyProtocol > 0 {
  579. defaultPolicy := proxyproto.REQUIRE
  580. if c.ProxyProtocol == 1 {
  581. defaultPolicy = proxyproto.IGNORE
  582. }
  583. return &proxyproto.Listener{
  584. Listener: listener,
  585. Policy: getProxyPolicy(c.proxyAllowed, c.proxySkipped, defaultPolicy),
  586. ReadHeaderTimeout: 10 * time.Second,
  587. }, nil
  588. }
  589. return nil, errors.New("proxy protocol not configured")
  590. }
  591. // GetRateLimitersStatus returns the rate limiters status
  592. func (c *Configuration) GetRateLimitersStatus() (bool, []string) {
  593. enabled := false
  594. var protocols []string
  595. for _, rlCfg := range c.RateLimitersConfig {
  596. if rlCfg.isEnabled() {
  597. enabled = true
  598. protocols = append(protocols, rlCfg.Protocols...)
  599. }
  600. }
  601. return enabled, util.RemoveDuplicates(protocols, false)
  602. }
  603. // IsAllowListEnabled returns true if the global allow list is enabled
  604. func (c *Configuration) IsAllowListEnabled() bool {
  605. return c.AllowListStatus > 0
  606. }
  607. // ExecuteStartupHook runs the startup hook if defined
  608. func (c *Configuration) ExecuteStartupHook() error {
  609. if c.StartupHook == "" {
  610. return nil
  611. }
  612. if strings.HasPrefix(c.StartupHook, "http") {
  613. var url *url.URL
  614. url, err := url.Parse(c.StartupHook)
  615. if err != nil {
  616. logger.Warn(logSender, "", "Invalid startup hook %q: %v", c.StartupHook, err)
  617. return err
  618. }
  619. startTime := time.Now()
  620. resp, err := httpclient.RetryableGet(url.String())
  621. if err != nil {
  622. logger.Warn(logSender, "", "Error executing startup hook: %v", err)
  623. return err
  624. }
  625. defer resp.Body.Close()
  626. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, response code: %v", time.Since(startTime), resp.StatusCode)
  627. return nil
  628. }
  629. if !filepath.IsAbs(c.StartupHook) {
  630. err := fmt.Errorf("invalid startup hook %q", c.StartupHook)
  631. logger.Warn(logSender, "", "Invalid startup hook %q", c.StartupHook)
  632. return err
  633. }
  634. startTime := time.Now()
  635. timeout, env, args := command.GetConfig(c.StartupHook, command.HookStartup)
  636. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  637. defer cancel()
  638. cmd := exec.CommandContext(ctx, c.StartupHook, args...)
  639. cmd.Env = env
  640. err := cmd.Run()
  641. logger.Debug(logSender, "", "Startup hook executed, elapsed: %s, error: %v", time.Since(startTime), err)
  642. return nil
  643. }
  644. func (c *Configuration) executePostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
  645. startNewHook()
  646. defer hookEnded()
  647. ipAddr := util.GetIPFromRemoteAddress(remoteAddr)
  648. connDuration := int64(time.Since(connectionTime) / time.Millisecond)
  649. if strings.HasPrefix(c.PostDisconnectHook, "http") {
  650. var url *url.URL
  651. url, err := url.Parse(c.PostDisconnectHook)
  652. if err != nil {
  653. logger.Warn(protocol, connID, "Invalid post disconnect hook %q: %v", c.PostDisconnectHook, err)
  654. return
  655. }
  656. q := url.Query()
  657. q.Add("ip", ipAddr)
  658. q.Add("protocol", protocol)
  659. q.Add("username", username)
  660. q.Add("connection_duration", strconv.FormatInt(connDuration, 10))
  661. url.RawQuery = q.Encode()
  662. startTime := time.Now()
  663. resp, err := httpclient.RetryableGet(url.String())
  664. respCode := 0
  665. if err == nil {
  666. respCode = resp.StatusCode
  667. resp.Body.Close()
  668. }
  669. logger.Debug(protocol, connID, "Post disconnect hook response code: %v, elapsed: %v, err: %v",
  670. respCode, time.Since(startTime), err)
  671. return
  672. }
  673. if !filepath.IsAbs(c.PostDisconnectHook) {
  674. logger.Debug(protocol, connID, "invalid post disconnect hook %q", c.PostDisconnectHook)
  675. return
  676. }
  677. timeout, env, args := command.GetConfig(c.PostDisconnectHook, command.HookPostDisconnect)
  678. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  679. defer cancel()
  680. startTime := time.Now()
  681. cmd := exec.CommandContext(ctx, c.PostDisconnectHook, args...)
  682. cmd.Env = append(env,
  683. fmt.Sprintf("SFTPGO_CONNECTION_IP=%s", ipAddr),
  684. fmt.Sprintf("SFTPGO_CONNECTION_USERNAME=%s", username),
  685. fmt.Sprintf("SFTPGO_CONNECTION_DURATION=%d", connDuration),
  686. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%s", protocol))
  687. err := cmd.Run()
  688. logger.Debug(protocol, connID, "Post disconnect hook executed, elapsed: %s error: %v", time.Since(startTime), err)
  689. }
  690. func (c *Configuration) checkPostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
  691. if c.PostDisconnectHook == "" {
  692. return
  693. }
  694. if !util.Contains(disconnHookProtocols, protocol) {
  695. return
  696. }
  697. go c.executePostDisconnectHook(remoteAddr, protocol, username, connID, connectionTime)
  698. }
  699. // ExecutePostConnectHook executes the post connect hook if defined
  700. func (c *Configuration) ExecutePostConnectHook(ipAddr, protocol string) error {
  701. if c.PostConnectHook == "" {
  702. return nil
  703. }
  704. if strings.HasPrefix(c.PostConnectHook, "http") {
  705. var url *url.URL
  706. url, err := url.Parse(c.PostConnectHook)
  707. if err != nil {
  708. logger.Warn(protocol, "", "Login from ip %q denied, invalid post connect hook %q: %v",
  709. ipAddr, c.PostConnectHook, err)
  710. return getPermissionDeniedError(protocol)
  711. }
  712. q := url.Query()
  713. q.Add("ip", ipAddr)
  714. q.Add("protocol", protocol)
  715. url.RawQuery = q.Encode()
  716. resp, err := httpclient.RetryableGet(url.String())
  717. if err != nil {
  718. logger.Warn(protocol, "", "Login from ip %q denied, error executing post connect hook: %v", ipAddr, err)
  719. return getPermissionDeniedError(protocol)
  720. }
  721. defer resp.Body.Close()
  722. if resp.StatusCode != http.StatusOK {
  723. logger.Warn(protocol, "", "Login from ip %q denied, post connect hook response code: %v", ipAddr, resp.StatusCode)
  724. return getPermissionDeniedError(protocol)
  725. }
  726. return nil
  727. }
  728. if !filepath.IsAbs(c.PostConnectHook) {
  729. err := fmt.Errorf("invalid post connect hook %q", c.PostConnectHook)
  730. logger.Warn(protocol, "", "Login from ip %q denied: %v", ipAddr, err)
  731. return getPermissionDeniedError(protocol)
  732. }
  733. timeout, env, args := command.GetConfig(c.PostConnectHook, command.HookPostConnect)
  734. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  735. defer cancel()
  736. cmd := exec.CommandContext(ctx, c.PostConnectHook, args...)
  737. cmd.Env = append(env,
  738. fmt.Sprintf("SFTPGO_CONNECTION_IP=%s", ipAddr),
  739. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%s", protocol))
  740. err := cmd.Run()
  741. if err != nil {
  742. logger.Warn(protocol, "", "Login from ip %q denied, connect hook error: %v", ipAddr, err)
  743. return getPermissionDeniedError(protocol)
  744. }
  745. return nil
  746. }
  747. func getProxyPolicy(allowed, skipped []func(net.IP) bool, def proxyproto.Policy) proxyproto.PolicyFunc {
  748. return func(upstream net.Addr) (proxyproto.Policy, error) {
  749. upstreamIP, err := util.GetIPFromNetAddr(upstream)
  750. if err != nil {
  751. // something is wrong with the source IP, better reject the connection
  752. return proxyproto.REJECT, err
  753. }
  754. for _, skippedFrom := range skipped {
  755. if skippedFrom(upstreamIP) {
  756. return proxyproto.SKIP, nil
  757. }
  758. }
  759. for _, allowFrom := range allowed {
  760. if allowFrom(upstreamIP) {
  761. if def == proxyproto.REQUIRE {
  762. return proxyproto.REQUIRE, nil
  763. }
  764. return proxyproto.USE, nil
  765. }
  766. }
  767. return def, nil
  768. }
  769. }
  770. // SSHConnection defines an ssh connection.
  771. // Each SSH connection can open several channels for SFTP or SSH commands
  772. type SSHConnection struct {
  773. id string
  774. conn net.Conn
  775. lastActivity atomic.Int64
  776. }
  777. // NewSSHConnection returns a new SSHConnection
  778. func NewSSHConnection(id string, conn net.Conn) *SSHConnection {
  779. c := &SSHConnection{
  780. id: id,
  781. conn: conn,
  782. }
  783. c.lastActivity.Store(time.Now().UnixNano())
  784. return c
  785. }
  786. // GetID returns the ID for this SSHConnection
  787. func (c *SSHConnection) GetID() string {
  788. return c.id
  789. }
  790. // UpdateLastActivity updates last activity for this connection
  791. func (c *SSHConnection) UpdateLastActivity() {
  792. c.lastActivity.Store(time.Now().UnixNano())
  793. }
  794. // GetLastActivity returns the last connection activity
  795. func (c *SSHConnection) GetLastActivity() time.Time {
  796. return time.Unix(0, c.lastActivity.Load())
  797. }
  798. // Close closes the underlying network connection
  799. func (c *SSHConnection) Close() error {
  800. return c.conn.Close()
  801. }
  802. // ActiveConnections holds the currect active connections with the associated transfers
  803. type ActiveConnections struct {
  804. // clients contains both authenticated and estabilished connections and the ones waiting
  805. // for authentication
  806. clients clientsMap
  807. transfersCheckStatus atomic.Bool
  808. sync.RWMutex
  809. connections []ActiveConnection
  810. mapping map[string]int
  811. sshConnections []*SSHConnection
  812. sshMapping map[string]int
  813. perUserConns map[string]int
  814. }
  815. // internal method, must be called within a locked block
  816. func (conns *ActiveConnections) addUserConnection(username string) {
  817. if username == "" {
  818. return
  819. }
  820. conns.perUserConns[username]++
  821. }
  822. // internal method, must be called within a locked block
  823. func (conns *ActiveConnections) removeUserConnection(username string) {
  824. if username == "" {
  825. return
  826. }
  827. if val, ok := conns.perUserConns[username]; ok {
  828. conns.perUserConns[username]--
  829. if val > 1 {
  830. return
  831. }
  832. delete(conns.perUserConns, username)
  833. }
  834. }
  835. // GetActiveSessions returns the number of active sessions for the given username.
  836. // We return the open sessions for any protocol
  837. func (conns *ActiveConnections) GetActiveSessions(username string) int {
  838. conns.RLock()
  839. defer conns.RUnlock()
  840. return conns.perUserConns[username]
  841. }
  842. // Add adds a new connection to the active ones
  843. func (conns *ActiveConnections) Add(c ActiveConnection) error {
  844. conns.Lock()
  845. defer conns.Unlock()
  846. if username := c.GetUsername(); username != "" {
  847. if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
  848. if val := conns.perUserConns[username]; val >= maxSessions {
  849. return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
  850. }
  851. }
  852. conns.addUserConnection(username)
  853. }
  854. conns.mapping[c.GetID()] = len(conns.connections)
  855. conns.connections = append(conns.connections, c)
  856. metric.UpdateActiveConnectionsSize(len(conns.connections))
  857. logger.Debug(c.GetProtocol(), c.GetID(), "connection added, local address %q, remote address %q, num open connections: %d",
  858. c.GetLocalAddress(), c.GetRemoteAddress(), len(conns.connections))
  859. return nil
  860. }
  861. // Swap replaces an existing connection with the given one.
  862. // This method is useful if you have to change some connection details
  863. // for example for FTP is used to update the connection once the user
  864. // authenticates
  865. func (conns *ActiveConnections) Swap(c ActiveConnection) error {
  866. conns.Lock()
  867. defer conns.Unlock()
  868. if idx, ok := conns.mapping[c.GetID()]; ok {
  869. conn := conns.connections[idx]
  870. conns.removeUserConnection(conn.GetUsername())
  871. if username := c.GetUsername(); username != "" {
  872. if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
  873. if val, ok := conns.perUserConns[username]; ok && val >= maxSessions {
  874. conns.addUserConnection(conn.GetUsername())
  875. return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
  876. }
  877. }
  878. conns.addUserConnection(username)
  879. }
  880. err := conn.CloseFS()
  881. conns.connections[idx] = c
  882. logger.Debug(logSender, c.GetID(), "connection swapped, close fs error: %v", err)
  883. conn = nil
  884. return nil
  885. }
  886. return errors.New("connection to swap not found")
  887. }
  888. // Remove removes a connection from the active ones
  889. func (conns *ActiveConnections) Remove(connectionID string) {
  890. conns.Lock()
  891. defer conns.Unlock()
  892. if idx, ok := conns.mapping[connectionID]; ok {
  893. conn := conns.connections[idx]
  894. err := conn.CloseFS()
  895. lastIdx := len(conns.connections) - 1
  896. conns.connections[idx] = conns.connections[lastIdx]
  897. conns.connections[lastIdx] = nil
  898. conns.connections = conns.connections[:lastIdx]
  899. delete(conns.mapping, connectionID)
  900. if idx != lastIdx {
  901. conns.mapping[conns.connections[idx].GetID()] = idx
  902. }
  903. conns.removeUserConnection(conn.GetUsername())
  904. metric.UpdateActiveConnectionsSize(lastIdx)
  905. logger.Debug(conn.GetProtocol(), conn.GetID(), "connection removed, local address %q, remote address %q close fs error: %v, num open connections: %d",
  906. conn.GetLocalAddress(), conn.GetRemoteAddress(), err, lastIdx)
  907. if conn.GetProtocol() == ProtocolFTP && conn.GetUsername() == "" && !util.Contains(ftpLoginCommands, conn.GetCommand()) {
  908. ip := util.GetIPFromRemoteAddress(conn.GetRemoteAddress())
  909. logger.ConnectionFailedLog("", ip, dataprovider.LoginMethodNoAuthTried, ProtocolFTP,
  910. dataprovider.ErrNoAuthTried.Error())
  911. metric.AddNoAuthTried()
  912. AddDefenderEvent(ip, ProtocolFTP, HostEventNoLoginTried)
  913. dataprovider.ExecutePostLoginHook(&dataprovider.User{}, dataprovider.LoginMethodNoAuthTried, ip,
  914. ProtocolFTP, dataprovider.ErrNoAuthTried)
  915. plugin.Handler.NotifyLogEvent(notifier.LogEventTypeNoLoginTried, ProtocolFTP, "", ip, "",
  916. dataprovider.ErrNoAuthTried)
  917. }
  918. Config.checkPostDisconnectHook(conn.GetRemoteAddress(), conn.GetProtocol(), conn.GetUsername(),
  919. conn.GetID(), conn.GetConnectionTime())
  920. return
  921. }
  922. logger.Debug(logSender, "", "connection id %q to remove not found!", connectionID)
  923. }
  924. // Close closes an active connection.
  925. // It returns true on success
  926. func (conns *ActiveConnections) Close(connectionID, role string) bool {
  927. conns.RLock()
  928. var result bool
  929. if idx, ok := conns.mapping[connectionID]; ok {
  930. c := conns.connections[idx]
  931. if role == "" || c.GetRole() == role {
  932. defer func(conn ActiveConnection) {
  933. err := conn.Disconnect()
  934. logger.Debug(conn.GetProtocol(), conn.GetID(), "close connection requested, close err: %v", err)
  935. }(c)
  936. result = true
  937. }
  938. }
  939. conns.RUnlock()
  940. return result
  941. }
  942. // AddSSHConnection adds a new ssh connection to the active ones
  943. func (conns *ActiveConnections) AddSSHConnection(c *SSHConnection) {
  944. conns.Lock()
  945. defer conns.Unlock()
  946. conns.sshMapping[c.GetID()] = len(conns.sshConnections)
  947. conns.sshConnections = append(conns.sshConnections, c)
  948. logger.Debug(logSender, c.GetID(), "ssh connection added, num open connections: %d", len(conns.sshConnections))
  949. }
  950. // RemoveSSHConnection removes a connection from the active ones
  951. func (conns *ActiveConnections) RemoveSSHConnection(connectionID string) {
  952. conns.Lock()
  953. defer conns.Unlock()
  954. if idx, ok := conns.sshMapping[connectionID]; ok {
  955. lastIdx := len(conns.sshConnections) - 1
  956. conns.sshConnections[idx] = conns.sshConnections[lastIdx]
  957. conns.sshConnections[lastIdx] = nil
  958. conns.sshConnections = conns.sshConnections[:lastIdx]
  959. delete(conns.sshMapping, connectionID)
  960. if idx != lastIdx {
  961. conns.sshMapping[conns.sshConnections[idx].GetID()] = idx
  962. }
  963. logger.Debug(logSender, connectionID, "ssh connection removed, num open ssh connections: %d", lastIdx)
  964. return
  965. }
  966. logger.Warn(logSender, "", "ssh connection to remove with id %q not found!", connectionID)
  967. }
  968. func (conns *ActiveConnections) checkIdles() {
  969. conns.RLock()
  970. for _, sshConn := range conns.sshConnections {
  971. idleTime := time.Since(sshConn.GetLastActivity())
  972. if idleTime > Config.idleTimeoutAsDuration {
  973. // we close an SSH connection if it has no active connections associated
  974. idToMatch := fmt.Sprintf("_%s_", sshConn.GetID())
  975. toClose := true
  976. for _, conn := range conns.connections {
  977. if strings.Contains(conn.GetID(), idToMatch) {
  978. if time.Since(conn.GetLastActivity()) <= Config.idleTimeoutAsDuration {
  979. toClose = false
  980. break
  981. }
  982. }
  983. }
  984. if toClose {
  985. defer func(c *SSHConnection) {
  986. err := c.Close()
  987. logger.Debug(logSender, c.GetID(), "close idle SSH connection, idle time: %v, close err: %v",
  988. time.Since(c.GetLastActivity()), err)
  989. }(sshConn)
  990. }
  991. }
  992. }
  993. for _, c := range conns.connections {
  994. idleTime := time.Since(c.GetLastActivity())
  995. isUnauthenticatedFTPUser := (c.GetProtocol() == ProtocolFTP && c.GetUsername() == "")
  996. if idleTime > Config.idleTimeoutAsDuration || (isUnauthenticatedFTPUser && idleTime > Config.idleLoginTimeout) {
  997. defer func(conn ActiveConnection) {
  998. err := conn.Disconnect()
  999. logger.Debug(conn.GetProtocol(), conn.GetID(), "close idle connection, idle time: %v, username: %q close err: %v",
  1000. time.Since(conn.GetLastActivity()), conn.GetUsername(), err)
  1001. }(c)
  1002. }
  1003. }
  1004. conns.RUnlock()
  1005. }
  1006. func (conns *ActiveConnections) checkTransfers() {
  1007. if conns.transfersCheckStatus.Load() {
  1008. logger.Warn(logSender, "", "the previous transfer check is still running, skipping execution")
  1009. return
  1010. }
  1011. conns.transfersCheckStatus.Store(true)
  1012. defer conns.transfersCheckStatus.Store(false)
  1013. conns.RLock()
  1014. if len(conns.connections) < 2 {
  1015. conns.RUnlock()
  1016. return
  1017. }
  1018. var wg sync.WaitGroup
  1019. logger.Debug(logSender, "", "start concurrent transfers check")
  1020. // update the current size for transfers to monitors
  1021. for _, c := range conns.connections {
  1022. for _, t := range c.GetTransfers() {
  1023. if t.HasSizeLimit {
  1024. wg.Add(1)
  1025. go func(transfer ConnectionTransfer, connID string) {
  1026. defer wg.Done()
  1027. transfersChecker.UpdateTransferCurrentSizes(transfer.ULSize, transfer.DLSize, transfer.ID, connID)
  1028. }(t, c.GetID())
  1029. }
  1030. }
  1031. }
  1032. conns.RUnlock()
  1033. logger.Debug(logSender, "", "waiting for the update of the transfers current size")
  1034. wg.Wait()
  1035. logger.Debug(logSender, "", "getting overquota transfers")
  1036. overquotaTransfers := transfersChecker.GetOverquotaTransfers()
  1037. logger.Debug(logSender, "", "number of overquota transfers: %v", len(overquotaTransfers))
  1038. if len(overquotaTransfers) == 0 {
  1039. return
  1040. }
  1041. conns.RLock()
  1042. defer conns.RUnlock()
  1043. for _, c := range conns.connections {
  1044. for _, overquotaTransfer := range overquotaTransfers {
  1045. if c.GetID() == overquotaTransfer.ConnID {
  1046. logger.Info(logSender, c.GetID(), "user %q is overquota, try to close transfer id %v",
  1047. c.GetUsername(), overquotaTransfer.TransferID)
  1048. var err error
  1049. if overquotaTransfer.TransferType == TransferDownload {
  1050. err = getReadQuotaExceededError(c.GetProtocol())
  1051. } else {
  1052. err = getQuotaExceededError(c.GetProtocol())
  1053. }
  1054. c.SignalTransferClose(overquotaTransfer.TransferID, err)
  1055. }
  1056. }
  1057. }
  1058. logger.Debug(logSender, "", "transfers check completed")
  1059. }
  1060. // AddClientConnection stores a new client connection
  1061. func (conns *ActiveConnections) AddClientConnection(ipAddr string) {
  1062. conns.clients.add(ipAddr)
  1063. }
  1064. // RemoveClientConnection removes a disconnected client from the tracked ones
  1065. func (conns *ActiveConnections) RemoveClientConnection(ipAddr string) {
  1066. conns.clients.remove(ipAddr)
  1067. }
  1068. // GetClientConnections returns the total number of client connections
  1069. func (conns *ActiveConnections) GetClientConnections() int32 {
  1070. return conns.clients.getTotal()
  1071. }
  1072. // IsNewConnectionAllowed returns an error if the maximum number of concurrent allowed
  1073. // connections is exceeded or a whitelist is defined and the specified ipAddr is not listed
  1074. // or the service is shutting down
  1075. func (conns *ActiveConnections) IsNewConnectionAllowed(ipAddr, protocol string) error {
  1076. if isShuttingDown.Load() {
  1077. return ErrShuttingDown
  1078. }
  1079. if Config.allowList != nil {
  1080. isListed, _, err := Config.allowList.IsListed(ipAddr, protocol)
  1081. if err != nil {
  1082. logger.Error(logSender, "", "unable to query allow list, connection denied, ip %q, protocol %s, err: %v",
  1083. ipAddr, protocol, err)
  1084. return ErrConnectionDenied
  1085. }
  1086. if !isListed {
  1087. return ErrConnectionDenied
  1088. }
  1089. }
  1090. if Config.MaxTotalConnections == 0 && Config.MaxPerHostConnections == 0 {
  1091. return nil
  1092. }
  1093. if Config.MaxPerHostConnections > 0 {
  1094. if total := conns.clients.getTotalFrom(ipAddr); total > Config.MaxPerHostConnections {
  1095. logger.Info(logSender, "", "active connections from %s %d/%d", ipAddr, total, Config.MaxPerHostConnections)
  1096. AddDefenderEvent(ipAddr, protocol, HostEventLimitExceeded)
  1097. return ErrConnectionDenied
  1098. }
  1099. }
  1100. if Config.MaxTotalConnections > 0 {
  1101. if total := conns.clients.getTotal(); total > int32(Config.MaxTotalConnections) {
  1102. logger.Info(logSender, "", "active client connections %d/%d", total, Config.MaxTotalConnections)
  1103. return ErrConnectionDenied
  1104. }
  1105. // on a single SFTP connection we could have multiple SFTP channels or commands
  1106. // so we check the estabilished connections too
  1107. conns.RLock()
  1108. defer conns.RUnlock()
  1109. if sess := len(conns.connections); sess >= Config.MaxTotalConnections {
  1110. logger.Info(logSender, "", "active client sessions %d/%d", sess, Config.MaxTotalConnections)
  1111. return ErrConnectionDenied
  1112. }
  1113. }
  1114. return nil
  1115. }
  1116. // GetStats returns stats for active connections
  1117. func (conns *ActiveConnections) GetStats(role string) []ConnectionStatus {
  1118. conns.RLock()
  1119. defer conns.RUnlock()
  1120. stats := make([]ConnectionStatus, 0, len(conns.connections))
  1121. node := dataprovider.GetNodeName()
  1122. for _, c := range conns.connections {
  1123. if role == "" || c.GetRole() == role {
  1124. stat := ConnectionStatus{
  1125. Username: c.GetUsername(),
  1126. ConnectionID: c.GetID(),
  1127. ClientVersion: c.GetClientVersion(),
  1128. RemoteAddress: c.GetRemoteAddress(),
  1129. ConnectionTime: util.GetTimeAsMsSinceEpoch(c.GetConnectionTime()),
  1130. LastActivity: util.GetTimeAsMsSinceEpoch(c.GetLastActivity()),
  1131. Protocol: c.GetProtocol(),
  1132. Command: c.GetCommand(),
  1133. Transfers: c.GetTransfers(),
  1134. Node: node,
  1135. }
  1136. stats = append(stats, stat)
  1137. }
  1138. }
  1139. return stats
  1140. }
  1141. // ConnectionStatus returns the status for an active connection
  1142. type ConnectionStatus struct {
  1143. // Logged in username
  1144. Username string `json:"username"`
  1145. // Unique identifier for the connection
  1146. ConnectionID string `json:"connection_id"`
  1147. // client's version string
  1148. ClientVersion string `json:"client_version,omitempty"`
  1149. // Remote address for this connection
  1150. RemoteAddress string `json:"remote_address"`
  1151. // Connection time as unix timestamp in milliseconds
  1152. ConnectionTime int64 `json:"connection_time"`
  1153. // Last activity as unix timestamp in milliseconds
  1154. LastActivity int64 `json:"last_activity"`
  1155. // Protocol for this connection
  1156. Protocol string `json:"protocol"`
  1157. // active uploads/downloads
  1158. Transfers []ConnectionTransfer `json:"active_transfers,omitempty"`
  1159. // SSH command or WebDAV method
  1160. Command string `json:"command,omitempty"`
  1161. // Node identifier, omitted for single node installations
  1162. Node string `json:"node,omitempty"`
  1163. }
  1164. // GetConnectionDuration returns the connection duration as string
  1165. func (c *ConnectionStatus) GetConnectionDuration() string {
  1166. elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(c.ConnectionTime))
  1167. return util.GetDurationAsString(elapsed)
  1168. }
  1169. // GetConnectionInfo returns connection info.
  1170. // Protocol,Client Version and RemoteAddress are returned.
  1171. func (c *ConnectionStatus) GetConnectionInfo() string {
  1172. var result strings.Builder
  1173. result.WriteString(fmt.Sprintf("%v. Client: %q From: %q", c.Protocol, c.ClientVersion, c.RemoteAddress))
  1174. if c.Command == "" {
  1175. return result.String()
  1176. }
  1177. switch c.Protocol {
  1178. case ProtocolSSH, ProtocolFTP:
  1179. result.WriteString(fmt.Sprintf(". Command: %q", c.Command))
  1180. case ProtocolWebDAV:
  1181. result.WriteString(fmt.Sprintf(". Method: %q", c.Command))
  1182. }
  1183. return result.String()
  1184. }
  1185. // GetTransfersAsString returns the active transfers as string
  1186. func (c *ConnectionStatus) GetTransfersAsString() string {
  1187. result := ""
  1188. for _, t := range c.Transfers {
  1189. if result != "" {
  1190. result += ". "
  1191. }
  1192. result += t.getConnectionTransferAsString()
  1193. }
  1194. return result
  1195. }
  1196. // ActiveQuotaScan defines an active quota scan for a user
  1197. type ActiveQuotaScan struct {
  1198. // Username to which the quota scan refers
  1199. Username string `json:"username"`
  1200. // quota scan start time as unix timestamp in milliseconds
  1201. StartTime int64 `json:"start_time"`
  1202. Role string `json:"-"`
  1203. }
  1204. // ActiveVirtualFolderQuotaScan defines an active quota scan for a virtual folder
  1205. type ActiveVirtualFolderQuotaScan struct {
  1206. // folder name to which the quota scan refers
  1207. Name string `json:"name"`
  1208. // quota scan start time as unix timestamp in milliseconds
  1209. StartTime int64 `json:"start_time"`
  1210. }
  1211. // ActiveScans holds the active quota scans
  1212. type ActiveScans struct {
  1213. sync.RWMutex
  1214. UserScans []ActiveQuotaScan
  1215. FolderScans []ActiveVirtualFolderQuotaScan
  1216. }
  1217. // GetUsersQuotaScans returns the active users quota scans
  1218. func (s *ActiveScans) GetUsersQuotaScans(role string) []ActiveQuotaScan {
  1219. s.RLock()
  1220. defer s.RUnlock()
  1221. scans := make([]ActiveQuotaScan, 0, len(s.UserScans))
  1222. for _, scan := range s.UserScans {
  1223. if role == "" || role == scan.Role {
  1224. scans = append(scans, ActiveQuotaScan{
  1225. Username: scan.Username,
  1226. StartTime: scan.StartTime,
  1227. })
  1228. }
  1229. }
  1230. return scans
  1231. }
  1232. // AddUserQuotaScan adds a user to the ones with active quota scans.
  1233. // Returns false if the user has a quota scan already running
  1234. func (s *ActiveScans) AddUserQuotaScan(username, role string) bool {
  1235. s.Lock()
  1236. defer s.Unlock()
  1237. for _, scan := range s.UserScans {
  1238. if scan.Username == username {
  1239. return false
  1240. }
  1241. }
  1242. s.UserScans = append(s.UserScans, ActiveQuotaScan{
  1243. Username: username,
  1244. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1245. Role: role,
  1246. })
  1247. return true
  1248. }
  1249. // RemoveUserQuotaScan removes a user from the ones with active quota scans.
  1250. // Returns false if the user has no active quota scans
  1251. func (s *ActiveScans) RemoveUserQuotaScan(username string) bool {
  1252. s.Lock()
  1253. defer s.Unlock()
  1254. for idx, scan := range s.UserScans {
  1255. if scan.Username == username {
  1256. lastIdx := len(s.UserScans) - 1
  1257. s.UserScans[idx] = s.UserScans[lastIdx]
  1258. s.UserScans = s.UserScans[:lastIdx]
  1259. return true
  1260. }
  1261. }
  1262. return false
  1263. }
  1264. // GetVFoldersQuotaScans returns the active quota scans for virtual folders
  1265. func (s *ActiveScans) GetVFoldersQuotaScans() []ActiveVirtualFolderQuotaScan {
  1266. s.RLock()
  1267. defer s.RUnlock()
  1268. scans := make([]ActiveVirtualFolderQuotaScan, len(s.FolderScans))
  1269. copy(scans, s.FolderScans)
  1270. return scans
  1271. }
  1272. // AddVFolderQuotaScan adds a virtual folder to the ones with active quota scans.
  1273. // Returns false if the folder has a quota scan already running
  1274. func (s *ActiveScans) AddVFolderQuotaScan(folderName string) bool {
  1275. s.Lock()
  1276. defer s.Unlock()
  1277. for _, scan := range s.FolderScans {
  1278. if scan.Name == folderName {
  1279. return false
  1280. }
  1281. }
  1282. s.FolderScans = append(s.FolderScans, ActiveVirtualFolderQuotaScan{
  1283. Name: folderName,
  1284. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1285. })
  1286. return true
  1287. }
  1288. // RemoveVFolderQuotaScan removes a folder from the ones with active quota scans.
  1289. // Returns false if the folder has no active quota scans
  1290. func (s *ActiveScans) RemoveVFolderQuotaScan(folderName string) bool {
  1291. s.Lock()
  1292. defer s.Unlock()
  1293. for idx, scan := range s.FolderScans {
  1294. if scan.Name == folderName {
  1295. lastIdx := len(s.FolderScans) - 1
  1296. s.FolderScans[idx] = s.FolderScans[lastIdx]
  1297. s.FolderScans = s.FolderScans[:lastIdx]
  1298. return true
  1299. }
  1300. }
  1301. return false
  1302. }
  1303. // MetadataCheck defines an active metadata check
  1304. type MetadataCheck struct {
  1305. // Username to which the metadata check refers
  1306. Username string `json:"username"`
  1307. // check start time as unix timestamp in milliseconds
  1308. StartTime int64 `json:"start_time"`
  1309. Role string `json:"-"`
  1310. }
  1311. // MetadataChecks holds the active metadata checks
  1312. type MetadataChecks struct {
  1313. sync.RWMutex
  1314. checks []MetadataCheck
  1315. }
  1316. // Get returns the active metadata checks
  1317. func (c *MetadataChecks) Get(role string) []MetadataCheck {
  1318. c.RLock()
  1319. defer c.RUnlock()
  1320. checks := make([]MetadataCheck, 0, len(c.checks))
  1321. for _, check := range c.checks {
  1322. if role == "" || role == check.Role {
  1323. checks = append(checks, MetadataCheck{
  1324. Username: check.Username,
  1325. StartTime: check.StartTime,
  1326. })
  1327. }
  1328. }
  1329. return checks
  1330. }
  1331. // Add adds a user to the ones with active metadata checks.
  1332. // Return false if a metadata check is already active for the specified user
  1333. func (c *MetadataChecks) Add(username, role string) bool {
  1334. c.Lock()
  1335. defer c.Unlock()
  1336. for idx := range c.checks {
  1337. if c.checks[idx].Username == username {
  1338. return false
  1339. }
  1340. }
  1341. c.checks = append(c.checks, MetadataCheck{
  1342. Username: username,
  1343. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1344. Role: role,
  1345. })
  1346. return true
  1347. }
  1348. // Remove removes a user from the ones with active metadata checks
  1349. func (c *MetadataChecks) Remove(username string) bool {
  1350. c.Lock()
  1351. defer c.Unlock()
  1352. for idx := range c.checks {
  1353. if c.checks[idx].Username == username {
  1354. lastIdx := len(c.checks) - 1
  1355. c.checks[idx] = c.checks[lastIdx]
  1356. c.checks = c.checks[:lastIdx]
  1357. return true
  1358. }
  1359. }
  1360. return false
  1361. }