common.go 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package common defines code shared among file transfer packages and protocols
  15. package common
  16. import (
  17. "context"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "os/exec"
  25. "path/filepath"
  26. "slices"
  27. "strconv"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "github.com/pires/go-proxyproto"
  33. "github.com/sftpgo/sdk/plugin/notifier"
  34. "github.com/drakkan/sftpgo/v2/internal/command"
  35. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  36. "github.com/drakkan/sftpgo/v2/internal/httpclient"
  37. "github.com/drakkan/sftpgo/v2/internal/logger"
  38. "github.com/drakkan/sftpgo/v2/internal/metric"
  39. "github.com/drakkan/sftpgo/v2/internal/plugin"
  40. "github.com/drakkan/sftpgo/v2/internal/smtp"
  41. "github.com/drakkan/sftpgo/v2/internal/util"
  42. "github.com/drakkan/sftpgo/v2/internal/version"
  43. "github.com/drakkan/sftpgo/v2/internal/vfs"
  44. )
  45. // constants
  46. const (
  47. logSender = "common"
  48. uploadLogSender = "Upload"
  49. downloadLogSender = "Download"
  50. renameLogSender = "Rename"
  51. rmdirLogSender = "Rmdir"
  52. mkdirLogSender = "Mkdir"
  53. symlinkLogSender = "Symlink"
  54. removeLogSender = "Remove"
  55. chownLogSender = "Chown"
  56. chmodLogSender = "Chmod"
  57. chtimesLogSender = "Chtimes"
  58. copyLogSender = "Copy"
  59. truncateLogSender = "Truncate"
  60. operationDownload = "download"
  61. operationUpload = "upload"
  62. operationFirstDownload = "first-download"
  63. operationFirstUpload = "first-upload"
  64. operationDelete = "delete"
  65. operationCopy = "copy"
  66. // Pre-download action name
  67. OperationPreDownload = "pre-download"
  68. // Pre-upload action name
  69. OperationPreUpload = "pre-upload"
  70. operationPreDelete = "pre-delete"
  71. operationRename = "rename"
  72. operationMkdir = "mkdir"
  73. operationRmdir = "rmdir"
  74. // SSH command action name
  75. OperationSSHCmd = "ssh_cmd"
  76. chtimesFormat = "2006-01-02T15:04:05" // YYYY-MM-DDTHH:MM:SS
  77. idleTimeoutCheckInterval = 3 * time.Minute
  78. periodicTimeoutCheckInterval = 1 * time.Minute
  79. )
  80. // Stat flags
  81. const (
  82. StatAttrUIDGID = 1
  83. StatAttrPerms = 2
  84. StatAttrTimes = 4
  85. StatAttrSize = 8
  86. )
  87. // Transfer types
  88. const (
  89. TransferUpload = iota
  90. TransferDownload
  91. )
  92. // Supported protocols
  93. const (
  94. ProtocolSFTP = "SFTP"
  95. ProtocolSCP = "SCP"
  96. ProtocolSSH = "SSH"
  97. ProtocolFTP = "FTP"
  98. ProtocolWebDAV = "DAV"
  99. ProtocolHTTP = "HTTP"
  100. ProtocolHTTPShare = "HTTPShare"
  101. ProtocolDataRetention = "DataRetention"
  102. ProtocolOIDC = "OIDC"
  103. protocolEventAction = "EventAction"
  104. )
  105. // Upload modes
  106. const (
  107. UploadModeStandard = 0
  108. UploadModeAtomic = 1
  109. UploadModeAtomicWithResume = 2
  110. UploadModeS3StoreOnError = 4
  111. UploadModeGCSStoreOnError = 8
  112. UploadModeAzureBlobStoreOnError = 16
  113. )
  114. func init() {
  115. Connections.clients = clientsMap{
  116. clients: make(map[string]int),
  117. }
  118. Connections.transfers = clientsMap{
  119. clients: make(map[string]int),
  120. }
  121. Connections.perUserConns = make(map[string]int)
  122. Connections.mapping = make(map[string]int)
  123. Connections.sshMapping = make(map[string]int)
  124. }
  125. // errors definitions
  126. var (
  127. ErrPermissionDenied = errors.New("permission denied")
  128. ErrNotExist = errors.New("no such file or directory")
  129. ErrOpUnsupported = errors.New("operation unsupported")
  130. ErrGenericFailure = errors.New("failure")
  131. ErrQuotaExceeded = errors.New("denying write due to space limit")
  132. ErrReadQuotaExceeded = errors.New("denying read due to quota limit")
  133. ErrConnectionDenied = errors.New("you are not allowed to connect")
  134. ErrNoBinding = errors.New("no binding configured")
  135. ErrCrtRevoked = errors.New("your certificate has been revoked")
  136. ErrNoCredentials = errors.New("no credential provided")
  137. ErrInternalFailure = errors.New("internal failure")
  138. ErrTransferAborted = errors.New("transfer aborted")
  139. ErrShuttingDown = errors.New("the service is shutting down")
  140. errNoTransfer = errors.New("requested transfer not found")
  141. errTransferMismatch = errors.New("transfer mismatch")
  142. )
  143. var (
  144. // Config is the configuration for the supported protocols
  145. Config Configuration
  146. // Connections is the list of active connections
  147. Connections ActiveConnections
  148. // QuotaScans is the list of active quota scans
  149. QuotaScans ActiveScans
  150. transfersChecker TransfersChecker
  151. supportedProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP, ProtocolWebDAV,
  152. ProtocolHTTP, ProtocolHTTPShare, ProtocolOIDC}
  153. disconnHookProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP}
  154. // the map key is the protocol, for each protocol we can have multiple rate limiters
  155. rateLimiters map[string][]*rateLimiter
  156. isShuttingDown atomic.Bool
  157. ftpLoginCommands = []string{"PASS", "USER"}
  158. fnUpdateBranding func(*dataprovider.BrandingConfigs)
  159. )
  160. // SetUpdateBrandingFn sets the function to call to update branding configs.
  161. func SetUpdateBrandingFn(fn func(*dataprovider.BrandingConfigs)) {
  162. fnUpdateBranding = fn
  163. }
  164. // Initialize sets the common configuration
  165. func Initialize(c Configuration, isShared int) error {
  166. isShuttingDown.Store(false)
  167. util.SetUmask(c.Umask)
  168. version.SetConfig(c.ServerVersion)
  169. dataprovider.SetTZ(c.TZ)
  170. Config = c
  171. Config.Actions.ExecuteOn = util.RemoveDuplicates(Config.Actions.ExecuteOn, true)
  172. Config.Actions.ExecuteSync = util.RemoveDuplicates(Config.Actions.ExecuteSync, true)
  173. Config.ProxyAllowed = util.RemoveDuplicates(Config.ProxyAllowed, true)
  174. Config.idleLoginTimeout = 2 * time.Minute
  175. Config.idleTimeoutAsDuration = time.Duration(Config.IdleTimeout) * time.Minute
  176. startPeriodicChecks(periodicTimeoutCheckInterval, isShared)
  177. Config.defender = nil
  178. Config.allowList = nil
  179. Config.rateLimitersList = nil
  180. rateLimiters = make(map[string][]*rateLimiter)
  181. for _, rlCfg := range c.RateLimitersConfig {
  182. if rlCfg.isEnabled() {
  183. if err := rlCfg.validate(); err != nil {
  184. return fmt.Errorf("rate limiters initialization error: %w", err)
  185. }
  186. rateLimiter := rlCfg.getLimiter()
  187. for _, protocol := range rlCfg.Protocols {
  188. rateLimiters[protocol] = append(rateLimiters[protocol], rateLimiter)
  189. }
  190. }
  191. }
  192. if len(rateLimiters) > 0 {
  193. rateLimitersList, err := dataprovider.NewIPList(dataprovider.IPListTypeRateLimiterSafeList)
  194. if err != nil {
  195. return fmt.Errorf("unable to initialize ratelimiters list: %w", err)
  196. }
  197. Config.rateLimitersList = rateLimitersList
  198. }
  199. if c.DefenderConfig.Enabled {
  200. if !slices.Contains(supportedDefenderDrivers, c.DefenderConfig.Driver) {
  201. return fmt.Errorf("unsupported defender driver %q", c.DefenderConfig.Driver)
  202. }
  203. var defender Defender
  204. var err error
  205. switch c.DefenderConfig.Driver {
  206. case DefenderDriverProvider:
  207. defender, err = newDBDefender(&c.DefenderConfig)
  208. default:
  209. defender, err = newInMemoryDefender(&c.DefenderConfig)
  210. }
  211. if err != nil {
  212. return fmt.Errorf("defender initialization error: %v", err)
  213. }
  214. logger.Info(logSender, "", "defender initialized with config %+v", c.DefenderConfig)
  215. Config.defender = defender
  216. }
  217. if c.AllowListStatus > 0 {
  218. allowList, err := dataprovider.NewIPList(dataprovider.IPListTypeAllowList)
  219. if err != nil {
  220. return fmt.Errorf("unable to initialize the allow list: %w", err)
  221. }
  222. logger.Info(logSender, "", "allow list initialized")
  223. Config.allowList = allowList
  224. }
  225. if err := c.initializeProxyProtocol(); err != nil {
  226. return err
  227. }
  228. vfs.SetTempPath(c.TempPath)
  229. dataprovider.SetTempPath(c.TempPath)
  230. vfs.SetAllowSelfConnections(c.AllowSelfConnections)
  231. vfs.SetRenameMode(c.RenameMode)
  232. vfs.SetReadMetadataMode(c.Metadata.Read)
  233. vfs.SetResumeMaxSize(c.ResumeMaxSize)
  234. vfs.SetUploadMode(c.UploadMode)
  235. dataprovider.SetAllowSelfConnections(c.AllowSelfConnections)
  236. transfersChecker = getTransfersChecker(isShared)
  237. return nil
  238. }
  239. // CheckClosing returns an error if the service is closing
  240. func CheckClosing() error {
  241. if isShuttingDown.Load() {
  242. return ErrShuttingDown
  243. }
  244. return nil
  245. }
  246. // WaitForTransfers waits, for the specified grace time, for currently ongoing
  247. // client-initiated transfer sessions to completes.
  248. // A zero graceTime means no wait
  249. func WaitForTransfers(graceTime int) {
  250. if graceTime == 0 {
  251. return
  252. }
  253. if isShuttingDown.Swap(true) {
  254. return
  255. }
  256. if activeHooks.Load() == 0 && getActiveConnections() == 0 {
  257. return
  258. }
  259. graceTimer := time.NewTimer(time.Duration(graceTime) * time.Second)
  260. ticker := time.NewTicker(3 * time.Second)
  261. for {
  262. select {
  263. case <-ticker.C:
  264. hooks := activeHooks.Load()
  265. logger.Info(logSender, "", "active hooks: %d", hooks)
  266. if hooks == 0 && getActiveConnections() == 0 {
  267. logger.Info(logSender, "", "no more active connections, graceful shutdown")
  268. ticker.Stop()
  269. graceTimer.Stop()
  270. return
  271. }
  272. case <-graceTimer.C:
  273. logger.Info(logSender, "", "grace time expired, hard shutdown")
  274. ticker.Stop()
  275. return
  276. }
  277. }
  278. }
  279. // getActiveConnections returns the number of connections with active transfers
  280. func getActiveConnections() int {
  281. var activeConns int
  282. Connections.RLock()
  283. for _, c := range Connections.connections {
  284. if len(c.GetTransfers()) > 0 {
  285. activeConns++
  286. }
  287. }
  288. Connections.RUnlock()
  289. logger.Info(logSender, "", "number of connections with active transfers: %d", activeConns)
  290. return activeConns
  291. }
  292. // LimitRate blocks until all the configured rate limiters
  293. // allow one event to happen.
  294. // It returns an error if the time to wait exceeds the max
  295. // allowed delay
  296. func LimitRate(protocol, ip string) (time.Duration, error) {
  297. if Config.rateLimitersList != nil {
  298. isListed, _, err := Config.rateLimitersList.IsListed(ip, protocol)
  299. if err == nil && isListed {
  300. return 0, nil
  301. }
  302. }
  303. for _, limiter := range rateLimiters[protocol] {
  304. if delay, err := limiter.Wait(ip, protocol); err != nil {
  305. logger.Debug(logSender, "", "protocol %s ip %s: %v", protocol, ip, err)
  306. return delay, err
  307. }
  308. }
  309. return 0, nil
  310. }
  311. // Reload reloads the whitelist, the IP filter plugin and the defender's block and safe lists
  312. func Reload() error {
  313. plugin.Handler.ReloadFilter()
  314. return nil
  315. }
  316. // DelayLogin applies the configured login delay
  317. func DelayLogin(err error) {
  318. if Config.defender != nil {
  319. Config.defender.DelayLogin(err)
  320. }
  321. }
  322. // IsBanned returns true if the specified IP address is banned
  323. func IsBanned(ip, protocol string) bool {
  324. if plugin.Handler.IsIPBanned(ip, protocol) {
  325. return true
  326. }
  327. if Config.defender == nil {
  328. return false
  329. }
  330. return Config.defender.IsBanned(ip, protocol)
  331. }
  332. // GetDefenderBanTime returns the ban time for the given IP
  333. // or nil if the IP is not banned or the defender is disabled
  334. func GetDefenderBanTime(ip string) (*time.Time, error) {
  335. if Config.defender == nil {
  336. return nil, nil
  337. }
  338. return Config.defender.GetBanTime(ip)
  339. }
  340. // GetDefenderHosts returns hosts that are banned or for which some violations have been detected
  341. func GetDefenderHosts() ([]dataprovider.DefenderEntry, error) {
  342. if Config.defender == nil {
  343. return nil, nil
  344. }
  345. return Config.defender.GetHosts()
  346. }
  347. // GetDefenderHost returns a defender host by ip, if any
  348. func GetDefenderHost(ip string) (dataprovider.DefenderEntry, error) {
  349. if Config.defender == nil {
  350. return dataprovider.DefenderEntry{}, errors.New("defender is disabled")
  351. }
  352. return Config.defender.GetHost(ip)
  353. }
  354. // DeleteDefenderHost removes the specified IP address from the defender lists
  355. func DeleteDefenderHost(ip string) bool {
  356. if Config.defender == nil {
  357. return false
  358. }
  359. return Config.defender.DeleteHost(ip)
  360. }
  361. // GetDefenderScore returns the score for the given IP
  362. func GetDefenderScore(ip string) (int, error) {
  363. if Config.defender == nil {
  364. return 0, nil
  365. }
  366. return Config.defender.GetScore(ip)
  367. }
  368. // AddDefenderEvent adds the specified defender event for the given IP.
  369. // Returns true if the IP is in the defender's safe list.
  370. func AddDefenderEvent(ip, protocol string, event HostEvent) bool {
  371. if Config.defender == nil {
  372. return false
  373. }
  374. return Config.defender.AddEvent(ip, protocol, event)
  375. }
  376. func reloadProviderConfigs() {
  377. configs, err := dataprovider.GetConfigs()
  378. if err != nil {
  379. logger.Error(logSender, "", "unable to load config from provider: %v", err)
  380. return
  381. }
  382. configs.SetNilsToEmpty()
  383. if fnUpdateBranding != nil {
  384. fnUpdateBranding(configs.Branding)
  385. }
  386. if err := configs.SMTP.TryDecrypt(); err != nil {
  387. logger.Error(logSender, "", "unable to decrypt smtp config: %v", err)
  388. return
  389. }
  390. smtp.Activate(configs.SMTP)
  391. }
  392. func startPeriodicChecks(duration time.Duration, isShared int) {
  393. startEventScheduler()
  394. spec := fmt.Sprintf("@every %s", duration)
  395. _, err := eventScheduler.AddFunc(spec, Connections.checkTransfers)
  396. util.PanicOnError(err)
  397. logger.Info(logSender, "", "scheduled overquota transfers check, schedule %q", spec)
  398. if isShared == 1 {
  399. logger.Info(logSender, "", "add reload configs task")
  400. _, err := eventScheduler.AddFunc("@every 10m", reloadProviderConfigs)
  401. util.PanicOnError(err)
  402. }
  403. if Config.IdleTimeout > 0 {
  404. ratio := idleTimeoutCheckInterval / periodicTimeoutCheckInterval
  405. spec = fmt.Sprintf("@every %s", duration*ratio)
  406. _, err = eventScheduler.AddFunc(spec, Connections.checkIdles)
  407. util.PanicOnError(err)
  408. logger.Info(logSender, "", "scheduled idle connections check, schedule %q", spec)
  409. }
  410. }
  411. // ActiveTransfer defines the interface for the current active transfers
  412. type ActiveTransfer interface {
  413. GetID() int64
  414. GetType() int
  415. GetSize() int64
  416. GetDownloadedSize() int64
  417. GetUploadedSize() int64
  418. GetVirtualPath() string
  419. GetStartTime() time.Time
  420. SignalClose(err error)
  421. Truncate(fsPath string, size int64) (int64, error)
  422. GetRealFsPath(fsPath string) string
  423. SetTimes(fsPath string, atime time.Time, mtime time.Time) bool
  424. GetTruncatedSize() int64
  425. HasSizeLimit() bool
  426. }
  427. // ActiveConnection defines the interface for the current active connections
  428. type ActiveConnection interface {
  429. GetID() string
  430. GetUsername() string
  431. GetRole() string
  432. GetMaxSessions() int
  433. GetLocalAddress() string
  434. GetRemoteAddress() string
  435. GetClientVersion() string
  436. GetProtocol() string
  437. GetConnectionTime() time.Time
  438. GetLastActivity() time.Time
  439. GetCommand() string
  440. Disconnect() error
  441. AddTransfer(t ActiveTransfer)
  442. RemoveTransfer(t ActiveTransfer)
  443. GetTransfers() []ConnectionTransfer
  444. SignalTransferClose(transferID int64, err error)
  445. CloseFS() error
  446. isAccessAllowed() bool
  447. }
  448. // StatAttributes defines the attributes for set stat commands
  449. type StatAttributes struct {
  450. Mode os.FileMode
  451. Atime time.Time
  452. Mtime time.Time
  453. UID int
  454. GID int
  455. Flags int
  456. Size int64
  457. }
  458. // ConnectionTransfer defines the trasfer details
  459. type ConnectionTransfer struct {
  460. ID int64 `json:"-"`
  461. OperationType string `json:"operation_type"`
  462. StartTime int64 `json:"start_time"`
  463. Size int64 `json:"size"`
  464. VirtualPath string `json:"path"`
  465. HasSizeLimit bool `json:"-"`
  466. ULSize int64 `json:"-"`
  467. DLSize int64 `json:"-"`
  468. }
  469. // MetadataConfig defines how to handle metadata for cloud storage backends
  470. type MetadataConfig struct {
  471. // If not zero the metadata will be read before downloads and will be
  472. // available in notifications
  473. Read int `json:"read" mapstructure:"read"`
  474. }
  475. // Configuration defines configuration parameters common to all supported protocols
  476. type Configuration struct {
  477. // Maximum idle timeout as minutes. If a client is idle for a time that exceeds this setting it will be disconnected.
  478. // 0 means disabled
  479. IdleTimeout int `json:"idle_timeout" mapstructure:"idle_timeout"`
  480. // UploadMode 0 means standard, the files are uploaded directly to the requested path.
  481. // 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path
  482. // when the client ends the upload. Atomic mode avoid problems such as a web server that
  483. // serves partial files when the files are being uploaded.
  484. // In atomic mode if there is an upload error the temporary file is deleted and so the requested
  485. // upload path will not contain a partial file.
  486. // 2 means atomic with resume support: as atomic but if there is an upload error the temporary
  487. // file is renamed to the requested path and not deleted, this way a client can reconnect and resume
  488. // the upload.
  489. // 4 means files for S3 backend are stored even if a client-side upload error is detected.
  490. // 8 means files for Google Cloud Storage backend are stored even if a client-side upload error is detected.
  491. // 16 means files for Azure Blob backend are stored even if a client-side upload error is detected.
  492. UploadMode int `json:"upload_mode" mapstructure:"upload_mode"`
  493. // Actions to execute for SFTP file operations and SSH commands
  494. Actions ProtocolActions `json:"actions" mapstructure:"actions"`
  495. // SetstatMode 0 means "normal mode": requests for changing permissions and owner/group are executed.
  496. // 1 means "ignore mode": requests for changing permissions and owner/group are silently ignored.
  497. // 2 means "ignore mode for cloud fs": requests for changing permissions and owner/group are
  498. // silently ignored for cloud based filesystem such as S3, GCS, Azure Blob. Requests for changing
  499. // modification times are ignored for cloud based filesystem if they are not supported.
  500. SetstatMode int `json:"setstat_mode" mapstructure:"setstat_mode"`
  501. // RenameMode defines how to handle directory renames. By default, renaming of non-empty directories
  502. // is not allowed for cloud storage providers (S3, GCS, Azure Blob). Set to 1 to enable recursive
  503. // renames for these providers, they may be slow, there is no atomic rename API like for local
  504. // filesystem, so SFTPGo will recursively list the directory contents and do a rename for each entry
  505. RenameMode int `json:"rename_mode" mapstructure:"rename_mode"`
  506. // ResumeMaxSize defines the maximum size allowed, in bytes, to resume uploads on storage backends
  507. // with immutable objects. By default, resuming uploads is not allowed for cloud storage providers
  508. // (S3, GCS, Azure Blob) because SFTPGo must rewrite the entire file.
  509. // Set to a value greater than 0 to allow resuming uploads of files smaller than or equal to the
  510. // defined size.
  511. ResumeMaxSize int64 `json:"resume_max_size" mapstructure:"resume_max_size"`
  512. // TempPath defines the path for temporary files such as those used for atomic uploads or file pipes.
  513. // If you set this option you must make sure that the defined path exists, is accessible for writing
  514. // by the user running SFTPGo, and is on the same filesystem as the users home directories otherwise
  515. // the renaming for atomic uploads will become a copy and therefore may take a long time.
  516. // The temporary files are not namespaced. The default is generally fine. Leave empty for the default.
  517. TempPath string `json:"temp_path" mapstructure:"temp_path"`
  518. // Support for HAProxy PROXY protocol.
  519. // If you are running SFTPGo behind a proxy server such as HAProxy, AWS ELB or NGNIX, you can enable
  520. // the proxy protocol. It provides a convenient way to safely transport connection information
  521. // such as a client's address across multiple layers of NAT or TCP proxies to get the real
  522. // client IP address instead of the proxy IP. Both protocol versions 1 and 2 are supported.
  523. // - 0 means disabled
  524. // - 1 means proxy protocol enabled. Proxy header will be used and requests without proxy header will be accepted.
  525. // - 2 means proxy protocol required. Proxy header will be used and requests without proxy header will be rejected.
  526. // If the proxy protocol is enabled in SFTPGo then you have to enable the protocol in your proxy configuration too,
  527. // for example for HAProxy add "send-proxy" or "send-proxy-v2" to each server configuration line.
  528. ProxyProtocol int `json:"proxy_protocol" mapstructure:"proxy_protocol"`
  529. // List of IP addresses and IP ranges allowed to send the proxy header.
  530. // If proxy protocol is set to 1 and we receive a proxy header from an IP that is not in the list then the
  531. // connection will be accepted and the header will be ignored.
  532. // If proxy protocol is set to 2 and we receive a proxy header from an IP that is not in the list then the
  533. // connection will be rejected.
  534. ProxyAllowed []string `json:"proxy_allowed" mapstructure:"proxy_allowed"`
  535. // List of IP addresses and IP ranges for which not to read the proxy header
  536. ProxySkipped []string `json:"proxy_skipped" mapstructure:"proxy_skipped"`
  537. // Absolute path to an external program or an HTTP URL to invoke as soon as SFTPGo starts.
  538. // If you define an HTTP URL it will be invoked using a `GET` request.
  539. // Please note that SFTPGo services may not yet be available when this hook is run.
  540. // Leave empty do disable.
  541. StartupHook string `json:"startup_hook" mapstructure:"startup_hook"`
  542. // Absolute path to an external program or an HTTP URL to invoke after a user connects
  543. // and before he tries to login. It allows you to reject the connection based on the source
  544. // ip address. Leave empty do disable.
  545. PostConnectHook string `json:"post_connect_hook" mapstructure:"post_connect_hook"`
  546. // Absolute path to an external program or an HTTP URL to invoke after an SSH/FTP connection ends.
  547. // Leave empty do disable.
  548. PostDisconnectHook string `json:"post_disconnect_hook" mapstructure:"post_disconnect_hook"`
  549. // Absolute path to an external program or an HTTP URL to invoke after a data retention check completes.
  550. // Leave empty do disable.
  551. DataRetentionHook string `json:"data_retention_hook" mapstructure:"data_retention_hook"`
  552. // Maximum number of concurrent client connections. 0 means unlimited
  553. MaxTotalConnections int `json:"max_total_connections" mapstructure:"max_total_connections"`
  554. // Maximum number of concurrent client connections from the same host (IP). 0 means unlimited
  555. MaxPerHostConnections int `json:"max_per_host_connections" mapstructure:"max_per_host_connections"`
  556. // Defines the status of the global allow list. 0 means disabled, 1 enabled.
  557. // If enabled, only the listed IPs/networks can access the configured services, all other
  558. // client connections will be dropped before they even try to authenticate.
  559. // Ensure to enable this setting only after adding some allowed ip/networks from the WebAdmin/REST API
  560. AllowListStatus int `json:"allowlist_status" mapstructure:"allowlist_status"`
  561. // Allow users on this instance to use other users/virtual folders on this instance as storage backend.
  562. // Enable this setting if you know what you are doing.
  563. AllowSelfConnections int `json:"allow_self_connections" mapstructure:"allow_self_connections"`
  564. // Defender configuration
  565. DefenderConfig DefenderConfig `json:"defender" mapstructure:"defender"`
  566. // Rate limiter configurations
  567. RateLimitersConfig []RateLimiterConfig `json:"rate_limiters" mapstructure:"rate_limiters"`
  568. // Umask for new uploads. Leave blank to use the system default.
  569. Umask string `json:"umask" mapstructure:"umask"`
  570. // Defines the server version
  571. ServerVersion string `json:"server_version" mapstructure:"server_version"`
  572. // TZ defines the time zone to use for the EventManager scheduler and to
  573. // control time-based access restrictions. Set to "local" to use the
  574. // server's local time, otherwise UTC will be used.
  575. TZ string `json:"tz" mapstructure:"tz"`
  576. // Metadata configuration
  577. Metadata MetadataConfig `json:"metadata" mapstructure:"metadata"`
  578. idleTimeoutAsDuration time.Duration
  579. idleLoginTimeout time.Duration
  580. defender Defender
  581. allowList *dataprovider.IPList
  582. rateLimitersList *dataprovider.IPList
  583. proxyAllowed []func(net.IP) bool
  584. proxySkipped []func(net.IP) bool
  585. }
  586. // IsAtomicUploadEnabled returns true if atomic upload is enabled
  587. func (c *Configuration) IsAtomicUploadEnabled() bool {
  588. return c.UploadMode&UploadModeAtomic != 0 || c.UploadMode&UploadModeAtomicWithResume != 0
  589. }
  590. func (c *Configuration) initializeProxyProtocol() error {
  591. if c.ProxyProtocol > 0 {
  592. allowed, err := util.ParseAllowedIPAndRanges(c.ProxyAllowed)
  593. if err != nil {
  594. return fmt.Errorf("invalid proxy allowed: %w", err)
  595. }
  596. skipped, err := util.ParseAllowedIPAndRanges(c.ProxySkipped)
  597. if err != nil {
  598. return fmt.Errorf("invalid proxy skipped: %w", err)
  599. }
  600. Config.proxyAllowed = allowed
  601. Config.proxySkipped = skipped
  602. }
  603. return nil
  604. }
  605. // GetProxyListener returns a wrapper for the given listener that supports the
  606. // HAProxy Proxy Protocol
  607. func (c *Configuration) GetProxyListener(listener net.Listener) (net.Listener, error) {
  608. if c.ProxyProtocol > 0 {
  609. defaultPolicy := proxyproto.REQUIRE
  610. if c.ProxyProtocol == 1 {
  611. defaultPolicy = proxyproto.IGNORE
  612. }
  613. return &proxyproto.Listener{
  614. Listener: listener,
  615. ConnPolicy: getProxyPolicy(c.proxyAllowed, c.proxySkipped, defaultPolicy),
  616. ReadHeaderTimeout: 10 * time.Second,
  617. }, nil
  618. }
  619. return nil, errors.New("proxy protocol not configured")
  620. }
  621. // GetRateLimitersStatus returns the rate limiters status
  622. func (c *Configuration) GetRateLimitersStatus() (bool, []string) {
  623. enabled := false
  624. var protocols []string
  625. for _, rlCfg := range c.RateLimitersConfig {
  626. if rlCfg.isEnabled() {
  627. enabled = true
  628. protocols = append(protocols, rlCfg.Protocols...)
  629. }
  630. }
  631. return enabled, util.RemoveDuplicates(protocols, false)
  632. }
  633. // IsAllowListEnabled returns true if the global allow list is enabled
  634. func (c *Configuration) IsAllowListEnabled() bool {
  635. return c.AllowListStatus > 0
  636. }
  637. // ExecuteStartupHook runs the startup hook if defined
  638. func (c *Configuration) ExecuteStartupHook() error {
  639. if c.StartupHook == "" {
  640. return nil
  641. }
  642. if strings.HasPrefix(c.StartupHook, "http") {
  643. var url *url.URL
  644. url, err := url.Parse(c.StartupHook)
  645. if err != nil {
  646. logger.Warn(logSender, "", "Invalid startup hook %q: %v", c.StartupHook, err)
  647. return err
  648. }
  649. startTime := time.Now()
  650. resp, err := httpclient.RetryableGet(url.String())
  651. if err != nil {
  652. logger.Warn(logSender, "", "Error executing startup hook: %v", err)
  653. return err
  654. }
  655. defer resp.Body.Close()
  656. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, response code: %v", time.Since(startTime), resp.StatusCode)
  657. return nil
  658. }
  659. if !filepath.IsAbs(c.StartupHook) {
  660. err := fmt.Errorf("invalid startup hook %q", c.StartupHook)
  661. logger.Warn(logSender, "", "Invalid startup hook %q", c.StartupHook)
  662. return err
  663. }
  664. startTime := time.Now()
  665. timeout, env, args := command.GetConfig(c.StartupHook, command.HookStartup)
  666. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  667. defer cancel()
  668. cmd := exec.CommandContext(ctx, c.StartupHook, args...)
  669. cmd.Env = env
  670. err := cmd.Run()
  671. logger.Debug(logSender, "", "Startup hook executed, elapsed: %s, error: %v", time.Since(startTime), err)
  672. return nil
  673. }
  674. func (c *Configuration) executePostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
  675. startNewHook()
  676. defer hookEnded()
  677. ipAddr := util.GetIPFromRemoteAddress(remoteAddr)
  678. connDuration := int64(time.Since(connectionTime) / time.Millisecond)
  679. if strings.HasPrefix(c.PostDisconnectHook, "http") {
  680. var url *url.URL
  681. url, err := url.Parse(c.PostDisconnectHook)
  682. if err != nil {
  683. logger.Warn(protocol, connID, "Invalid post disconnect hook %q: %v", c.PostDisconnectHook, err)
  684. return
  685. }
  686. q := url.Query()
  687. q.Add("ip", ipAddr)
  688. q.Add("protocol", protocol)
  689. q.Add("username", username)
  690. q.Add("connection_duration", strconv.FormatInt(connDuration, 10))
  691. url.RawQuery = q.Encode()
  692. startTime := time.Now()
  693. resp, err := httpclient.RetryableGet(url.String())
  694. respCode := 0
  695. if err == nil {
  696. respCode = resp.StatusCode
  697. resp.Body.Close()
  698. }
  699. logger.Debug(protocol, connID, "Post disconnect hook response code: %v, elapsed: %v, err: %v",
  700. respCode, time.Since(startTime), err)
  701. return
  702. }
  703. if !filepath.IsAbs(c.PostDisconnectHook) {
  704. logger.Debug(protocol, connID, "invalid post disconnect hook %q", c.PostDisconnectHook)
  705. return
  706. }
  707. timeout, env, args := command.GetConfig(c.PostDisconnectHook, command.HookPostDisconnect)
  708. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  709. defer cancel()
  710. startTime := time.Now()
  711. cmd := exec.CommandContext(ctx, c.PostDisconnectHook, args...)
  712. cmd.Env = append(env,
  713. fmt.Sprintf("SFTPGO_CONNECTION_IP=%s", ipAddr),
  714. fmt.Sprintf("SFTPGO_CONNECTION_USERNAME=%s", username),
  715. fmt.Sprintf("SFTPGO_CONNECTION_DURATION=%d", connDuration),
  716. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%s", protocol))
  717. err := cmd.Run()
  718. logger.Debug(protocol, connID, "Post disconnect hook executed, elapsed: %s error: %v", time.Since(startTime), err)
  719. }
  720. func (c *Configuration) checkPostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
  721. if c.PostDisconnectHook == "" {
  722. return
  723. }
  724. if !slices.Contains(disconnHookProtocols, protocol) {
  725. return
  726. }
  727. go c.executePostDisconnectHook(remoteAddr, protocol, username, connID, connectionTime)
  728. }
  729. // ExecutePostConnectHook executes the post connect hook if defined
  730. func (c *Configuration) ExecutePostConnectHook(ipAddr, protocol string) error {
  731. if c.PostConnectHook == "" {
  732. return nil
  733. }
  734. if strings.HasPrefix(c.PostConnectHook, "http") {
  735. var url *url.URL
  736. url, err := url.Parse(c.PostConnectHook)
  737. if err != nil {
  738. logger.Warn(protocol, "", "Login from ip %q denied, invalid post connect hook %q: %v",
  739. ipAddr, c.PostConnectHook, err)
  740. return getPermissionDeniedError(protocol)
  741. }
  742. q := url.Query()
  743. q.Add("ip", ipAddr)
  744. q.Add("protocol", protocol)
  745. url.RawQuery = q.Encode()
  746. resp, err := httpclient.RetryableGet(url.String())
  747. if err != nil {
  748. logger.Warn(protocol, "", "Login from ip %q denied, error executing post connect hook: %v", ipAddr, err)
  749. return getPermissionDeniedError(protocol)
  750. }
  751. defer resp.Body.Close()
  752. if resp.StatusCode != http.StatusOK {
  753. logger.Warn(protocol, "", "Login from ip %q denied, post connect hook response code: %v", ipAddr, resp.StatusCode)
  754. return getPermissionDeniedError(protocol)
  755. }
  756. return nil
  757. }
  758. if !filepath.IsAbs(c.PostConnectHook) {
  759. err := fmt.Errorf("invalid post connect hook %q", c.PostConnectHook)
  760. logger.Warn(protocol, "", "Login from ip %q denied: %v", ipAddr, err)
  761. return getPermissionDeniedError(protocol)
  762. }
  763. timeout, env, args := command.GetConfig(c.PostConnectHook, command.HookPostConnect)
  764. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  765. defer cancel()
  766. cmd := exec.CommandContext(ctx, c.PostConnectHook, args...)
  767. cmd.Env = append(env,
  768. fmt.Sprintf("SFTPGO_CONNECTION_IP=%s", ipAddr),
  769. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%s", protocol))
  770. err := cmd.Run()
  771. if err != nil {
  772. logger.Warn(protocol, "", "Login from ip %q denied, connect hook error: %v", ipAddr, err)
  773. return getPermissionDeniedError(protocol)
  774. }
  775. return nil
  776. }
  777. func getProxyPolicy(allowed, skipped []func(net.IP) bool, def proxyproto.Policy) proxyproto.ConnPolicyFunc {
  778. return func(connPolicyOptions proxyproto.ConnPolicyOptions) (proxyproto.Policy, error) {
  779. upstreamIP, err := util.GetIPFromNetAddr(connPolicyOptions.Upstream)
  780. if err != nil {
  781. // Something is wrong with the source IP, better reject the
  782. // connection.
  783. return proxyproto.REJECT, proxyproto.ErrInvalidUpstream
  784. }
  785. for _, skippedFrom := range skipped {
  786. if skippedFrom(upstreamIP) {
  787. return proxyproto.SKIP, nil
  788. }
  789. }
  790. for _, allowFrom := range allowed {
  791. if allowFrom(upstreamIP) {
  792. if def == proxyproto.REQUIRE {
  793. return proxyproto.REQUIRE, nil
  794. }
  795. return proxyproto.USE, nil
  796. }
  797. }
  798. if def == proxyproto.REQUIRE {
  799. return proxyproto.REJECT, proxyproto.ErrInvalidUpstream
  800. }
  801. return def, nil
  802. }
  803. }
  804. // SSHConnection defines an ssh connection.
  805. // Each SSH connection can open several channels for SFTP or SSH commands
  806. type SSHConnection struct {
  807. id string
  808. conn net.Conn
  809. lastActivity atomic.Int64
  810. }
  811. // NewSSHConnection returns a new SSHConnection
  812. func NewSSHConnection(id string, conn net.Conn) *SSHConnection {
  813. c := &SSHConnection{
  814. id: id,
  815. conn: conn,
  816. }
  817. c.lastActivity.Store(time.Now().UnixNano())
  818. return c
  819. }
  820. // GetID returns the ID for this SSHConnection
  821. func (c *SSHConnection) GetID() string {
  822. return c.id
  823. }
  824. // UpdateLastActivity updates last activity for this connection
  825. func (c *SSHConnection) UpdateLastActivity() {
  826. c.lastActivity.Store(time.Now().UnixNano())
  827. }
  828. // GetLastActivity returns the last connection activity
  829. func (c *SSHConnection) GetLastActivity() time.Time {
  830. return time.Unix(0, c.lastActivity.Load())
  831. }
  832. // Close closes the underlying network connection
  833. func (c *SSHConnection) Close() error {
  834. return c.conn.Close()
  835. }
  836. // ActiveConnections holds the currect active connections with the associated transfers
  837. type ActiveConnections struct {
  838. // clients contains both authenticated and estabilished connections and the ones waiting
  839. // for authentication
  840. clients clientsMap
  841. // transfers contains active transfers, total and per-user
  842. transfers clientsMap
  843. transfersCheckStatus atomic.Bool
  844. sync.RWMutex
  845. connections []ActiveConnection
  846. mapping map[string]int
  847. sshConnections []*SSHConnection
  848. sshMapping map[string]int
  849. perUserConns map[string]int
  850. }
  851. // internal method, must be called within a locked block
  852. func (conns *ActiveConnections) addUserConnection(username string) {
  853. if username == "" {
  854. return
  855. }
  856. conns.perUserConns[username]++
  857. }
  858. // internal method, must be called within a locked block
  859. func (conns *ActiveConnections) removeUserConnection(username string) {
  860. if username == "" {
  861. return
  862. }
  863. if val, ok := conns.perUserConns[username]; ok {
  864. conns.perUserConns[username]--
  865. if val > 1 {
  866. return
  867. }
  868. delete(conns.perUserConns, username)
  869. }
  870. }
  871. // GetActiveSessions returns the number of active sessions for the given username.
  872. // We return the open sessions for any protocol
  873. func (conns *ActiveConnections) GetActiveSessions(username string) int {
  874. conns.RLock()
  875. defer conns.RUnlock()
  876. return conns.perUserConns[username]
  877. }
  878. // Add adds a new connection to the active ones
  879. func (conns *ActiveConnections) Add(c ActiveConnection) error {
  880. conns.Lock()
  881. defer conns.Unlock()
  882. if username := c.GetUsername(); username != "" {
  883. if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
  884. if val := conns.perUserConns[username]; val >= maxSessions {
  885. return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
  886. }
  887. if val := conns.transfers.getTotalFrom(username); val >= maxSessions {
  888. return fmt.Errorf("too many open transfers: %d/%d", val, maxSessions)
  889. }
  890. }
  891. conns.addUserConnection(username)
  892. }
  893. conns.mapping[c.GetID()] = len(conns.connections)
  894. conns.connections = append(conns.connections, c)
  895. metric.UpdateActiveConnectionsSize(len(conns.connections))
  896. logger.Debug(c.GetProtocol(), c.GetID(), "connection added, local address %q, remote address %q, num open connections: %d",
  897. c.GetLocalAddress(), c.GetRemoteAddress(), len(conns.connections))
  898. return nil
  899. }
  900. // Swap replaces an existing connection with the given one.
  901. // This method is useful if you have to change some connection details
  902. // for example for FTP is used to update the connection once the user
  903. // authenticates
  904. func (conns *ActiveConnections) Swap(c ActiveConnection) error {
  905. conns.Lock()
  906. defer conns.Unlock()
  907. if idx, ok := conns.mapping[c.GetID()]; ok {
  908. conn := conns.connections[idx]
  909. conns.removeUserConnection(conn.GetUsername())
  910. if username := c.GetUsername(); username != "" {
  911. if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
  912. if val, ok := conns.perUserConns[username]; ok && val >= maxSessions {
  913. conns.addUserConnection(conn.GetUsername())
  914. return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
  915. }
  916. }
  917. conns.addUserConnection(username)
  918. }
  919. err := conn.CloseFS()
  920. conns.connections[idx] = c
  921. logger.Debug(logSender, c.GetID(), "connection swapped, close fs error: %v", err)
  922. conn = nil
  923. return nil
  924. }
  925. return errors.New("connection to swap not found")
  926. }
  927. // Remove removes a connection from the active ones
  928. func (conns *ActiveConnections) Remove(connectionID string) {
  929. conns.Lock()
  930. defer conns.Unlock()
  931. if idx, ok := conns.mapping[connectionID]; ok {
  932. conn := conns.connections[idx]
  933. err := conn.CloseFS()
  934. lastIdx := len(conns.connections) - 1
  935. conns.connections[idx] = conns.connections[lastIdx]
  936. conns.connections[lastIdx] = nil
  937. conns.connections = conns.connections[:lastIdx]
  938. delete(conns.mapping, connectionID)
  939. if idx != lastIdx {
  940. conns.mapping[conns.connections[idx].GetID()] = idx
  941. }
  942. conns.removeUserConnection(conn.GetUsername())
  943. metric.UpdateActiveConnectionsSize(lastIdx)
  944. logger.Debug(conn.GetProtocol(), conn.GetID(), "connection removed, local address %q, remote address %q close fs error: %v, num open connections: %d",
  945. conn.GetLocalAddress(), conn.GetRemoteAddress(), err, lastIdx)
  946. if conn.GetProtocol() == ProtocolFTP && conn.GetUsername() == "" && !slices.Contains(ftpLoginCommands, conn.GetCommand()) {
  947. ip := util.GetIPFromRemoteAddress(conn.GetRemoteAddress())
  948. logger.ConnectionFailedLog("", ip, dataprovider.LoginMethodNoAuthTried, ProtocolFTP,
  949. dataprovider.ErrNoAuthTried.Error())
  950. metric.AddNoAuthTried()
  951. AddDefenderEvent(ip, ProtocolFTP, HostEventNoLoginTried)
  952. dataprovider.ExecutePostLoginHook(&dataprovider.User{}, dataprovider.LoginMethodNoAuthTried, ip,
  953. ProtocolFTP, dataprovider.ErrNoAuthTried)
  954. plugin.Handler.NotifyLogEvent(notifier.LogEventTypeNoLoginTried, ProtocolFTP, "", ip, "",
  955. dataprovider.ErrNoAuthTried)
  956. }
  957. Config.checkPostDisconnectHook(conn.GetRemoteAddress(), conn.GetProtocol(), conn.GetUsername(),
  958. conn.GetID(), conn.GetConnectionTime())
  959. return
  960. }
  961. logger.Debug(logSender, "", "connection id %q to remove not found!", connectionID)
  962. }
  963. // Close closes an active connection.
  964. // It returns true on success
  965. func (conns *ActiveConnections) Close(connectionID, role string) bool {
  966. conns.RLock()
  967. var result bool
  968. if idx, ok := conns.mapping[connectionID]; ok {
  969. c := conns.connections[idx]
  970. if role == "" || c.GetRole() == role {
  971. defer func(conn ActiveConnection) {
  972. err := conn.Disconnect()
  973. logger.Debug(conn.GetProtocol(), conn.GetID(), "close connection requested, close err: %v", err)
  974. }(c)
  975. result = true
  976. }
  977. }
  978. conns.RUnlock()
  979. return result
  980. }
  981. // AddSSHConnection adds a new ssh connection to the active ones
  982. func (conns *ActiveConnections) AddSSHConnection(c *SSHConnection) {
  983. conns.Lock()
  984. defer conns.Unlock()
  985. conns.sshMapping[c.GetID()] = len(conns.sshConnections)
  986. conns.sshConnections = append(conns.sshConnections, c)
  987. logger.Debug(logSender, c.GetID(), "ssh connection added, num open connections: %d", len(conns.sshConnections))
  988. }
  989. // RemoveSSHConnection removes a connection from the active ones
  990. func (conns *ActiveConnections) RemoveSSHConnection(connectionID string) {
  991. conns.Lock()
  992. defer conns.Unlock()
  993. if idx, ok := conns.sshMapping[connectionID]; ok {
  994. lastIdx := len(conns.sshConnections) - 1
  995. conns.sshConnections[idx] = conns.sshConnections[lastIdx]
  996. conns.sshConnections[lastIdx] = nil
  997. conns.sshConnections = conns.sshConnections[:lastIdx]
  998. delete(conns.sshMapping, connectionID)
  999. if idx != lastIdx {
  1000. conns.sshMapping[conns.sshConnections[idx].GetID()] = idx
  1001. }
  1002. logger.Debug(logSender, connectionID, "ssh connection removed, num open ssh connections: %d", lastIdx)
  1003. return
  1004. }
  1005. logger.Warn(logSender, "", "ssh connection to remove with id %q not found!", connectionID)
  1006. }
  1007. func (conns *ActiveConnections) checkIdles() {
  1008. conns.RLock()
  1009. for _, sshConn := range conns.sshConnections {
  1010. idleTime := time.Since(sshConn.GetLastActivity())
  1011. if idleTime > Config.idleTimeoutAsDuration {
  1012. // we close an SSH connection if it has no active connections associated
  1013. idToMatch := fmt.Sprintf("_%s_", sshConn.GetID())
  1014. toClose := true
  1015. for _, conn := range conns.connections {
  1016. if strings.Contains(conn.GetID(), idToMatch) {
  1017. if time.Since(conn.GetLastActivity()) <= Config.idleTimeoutAsDuration {
  1018. toClose = false
  1019. break
  1020. }
  1021. }
  1022. }
  1023. if toClose {
  1024. defer func(c *SSHConnection) {
  1025. err := c.Close()
  1026. logger.Debug(logSender, c.GetID(), "close idle SSH connection, idle time: %v, close err: %v",
  1027. time.Since(c.GetLastActivity()), err)
  1028. }(sshConn)
  1029. }
  1030. }
  1031. }
  1032. for _, c := range conns.connections {
  1033. idleTime := time.Since(c.GetLastActivity())
  1034. isUnauthenticatedFTPUser := (c.GetProtocol() == ProtocolFTP && c.GetUsername() == "")
  1035. if idleTime > Config.idleTimeoutAsDuration || (isUnauthenticatedFTPUser && idleTime > Config.idleLoginTimeout) {
  1036. defer func(conn ActiveConnection) {
  1037. err := conn.Disconnect()
  1038. logger.Debug(conn.GetProtocol(), conn.GetID(), "close idle connection, idle time: %s, username: %q close err: %v",
  1039. time.Since(conn.GetLastActivity()), conn.GetUsername(), err)
  1040. }(c)
  1041. } else if !c.isAccessAllowed() {
  1042. defer func(conn ActiveConnection) {
  1043. err := conn.Disconnect()
  1044. logger.Info(conn.GetProtocol(), conn.GetID(), "access conditions not met for user: %q close connection err: %v",
  1045. conn.GetUsername(), err)
  1046. }(c)
  1047. }
  1048. }
  1049. conns.RUnlock()
  1050. }
  1051. func (conns *ActiveConnections) checkTransfers() {
  1052. if conns.transfersCheckStatus.Load() {
  1053. logger.Warn(logSender, "", "the previous transfer check is still running, skipping execution")
  1054. return
  1055. }
  1056. conns.transfersCheckStatus.Store(true)
  1057. defer conns.transfersCheckStatus.Store(false)
  1058. conns.RLock()
  1059. if len(conns.connections) < 2 {
  1060. conns.RUnlock()
  1061. return
  1062. }
  1063. var wg sync.WaitGroup
  1064. logger.Debug(logSender, "", "start concurrent transfers check")
  1065. // update the current size for transfers to monitors
  1066. for _, c := range conns.connections {
  1067. for _, t := range c.GetTransfers() {
  1068. if t.HasSizeLimit {
  1069. wg.Add(1)
  1070. go func(transfer ConnectionTransfer, connID string) {
  1071. defer wg.Done()
  1072. transfersChecker.UpdateTransferCurrentSizes(transfer.ULSize, transfer.DLSize, transfer.ID, connID)
  1073. }(t, c.GetID())
  1074. }
  1075. }
  1076. }
  1077. conns.RUnlock()
  1078. logger.Debug(logSender, "", "waiting for the update of the transfers current size")
  1079. wg.Wait()
  1080. logger.Debug(logSender, "", "getting overquota transfers")
  1081. overquotaTransfers := transfersChecker.GetOverquotaTransfers()
  1082. logger.Debug(logSender, "", "number of overquota transfers: %v", len(overquotaTransfers))
  1083. if len(overquotaTransfers) == 0 {
  1084. return
  1085. }
  1086. conns.RLock()
  1087. defer conns.RUnlock()
  1088. for _, c := range conns.connections {
  1089. for _, overquotaTransfer := range overquotaTransfers {
  1090. if c.GetID() == overquotaTransfer.ConnID {
  1091. logger.Info(logSender, c.GetID(), "user %q is overquota, try to close transfer id %v",
  1092. c.GetUsername(), overquotaTransfer.TransferID)
  1093. var err error
  1094. if overquotaTransfer.TransferType == TransferDownload {
  1095. err = getReadQuotaExceededError(c.GetProtocol())
  1096. } else {
  1097. err = getQuotaExceededError(c.GetProtocol())
  1098. }
  1099. c.SignalTransferClose(overquotaTransfer.TransferID, err)
  1100. }
  1101. }
  1102. }
  1103. logger.Debug(logSender, "", "transfers check completed")
  1104. }
  1105. // AddClientConnection stores a new client connection
  1106. func (conns *ActiveConnections) AddClientConnection(ipAddr string) {
  1107. conns.clients.add(ipAddr)
  1108. }
  1109. // RemoveClientConnection removes a disconnected client from the tracked ones
  1110. func (conns *ActiveConnections) RemoveClientConnection(ipAddr string) {
  1111. conns.clients.remove(ipAddr)
  1112. }
  1113. // GetClientConnections returns the total number of client connections
  1114. func (conns *ActiveConnections) GetClientConnections() int32 {
  1115. return conns.clients.getTotal()
  1116. }
  1117. // GetTotalTransfers returns the total number of active transfers
  1118. func (conns *ActiveConnections) GetTotalTransfers() int32 {
  1119. return conns.transfers.getTotal()
  1120. }
  1121. // IsNewTransferAllowed returns an error if the maximum number of concurrent allowed
  1122. // transfers is exceeded
  1123. func (conns *ActiveConnections) IsNewTransferAllowed(username string) error {
  1124. if isShuttingDown.Load() {
  1125. return ErrShuttingDown
  1126. }
  1127. if Config.MaxTotalConnections == 0 && Config.MaxPerHostConnections == 0 {
  1128. return nil
  1129. }
  1130. if Config.MaxPerHostConnections > 0 {
  1131. if transfers := conns.transfers.getTotalFrom(username); transfers >= Config.MaxPerHostConnections {
  1132. logger.Info(logSender, "", "active transfers from user %q: %d/%d", username, transfers, Config.MaxPerHostConnections)
  1133. return ErrConnectionDenied
  1134. }
  1135. }
  1136. if Config.MaxTotalConnections > 0 {
  1137. if transfers := conns.transfers.getTotal(); transfers >= int32(Config.MaxTotalConnections) {
  1138. logger.Info(logSender, "", "active transfers %d/%d", transfers, Config.MaxTotalConnections)
  1139. return ErrConnectionDenied
  1140. }
  1141. }
  1142. return nil
  1143. }
  1144. // IsNewConnectionAllowed returns an error if the maximum number of concurrent allowed
  1145. // connections is exceeded or a whitelist is defined and the specified ipAddr is not listed
  1146. // or the service is shutting down
  1147. func (conns *ActiveConnections) IsNewConnectionAllowed(ipAddr, protocol string) error {
  1148. if isShuttingDown.Load() {
  1149. return ErrShuttingDown
  1150. }
  1151. if Config.allowList != nil {
  1152. isListed, _, err := Config.allowList.IsListed(ipAddr, protocol)
  1153. if err != nil {
  1154. logger.Error(logSender, "", "unable to query allow list, connection denied, ip %q, protocol %s, err: %v",
  1155. ipAddr, protocol, err)
  1156. return ErrConnectionDenied
  1157. }
  1158. if !isListed {
  1159. return ErrConnectionDenied
  1160. }
  1161. }
  1162. if Config.MaxTotalConnections == 0 && Config.MaxPerHostConnections == 0 {
  1163. return nil
  1164. }
  1165. if Config.MaxPerHostConnections > 0 {
  1166. if total := conns.clients.getTotalFrom(ipAddr); total > Config.MaxPerHostConnections {
  1167. if !AddDefenderEvent(ipAddr, protocol, HostEventLimitExceeded) {
  1168. logger.Warn(logSender, "", "connection denied, active connections from IP %q: %d/%d",
  1169. ipAddr, total, Config.MaxPerHostConnections)
  1170. return ErrConnectionDenied
  1171. }
  1172. logger.Info(logSender, "", "active connections from safe IP %q: %d", ipAddr, total)
  1173. }
  1174. }
  1175. if Config.MaxTotalConnections > 0 {
  1176. if total := conns.clients.getTotal(); total > int32(Config.MaxTotalConnections) {
  1177. logger.Info(logSender, "", "active client connections %d/%d", total, Config.MaxTotalConnections)
  1178. return ErrConnectionDenied
  1179. }
  1180. // on a single SFTP connection we could have multiple SFTP channels or commands
  1181. // so we check the estabilished connections and active uploads too
  1182. if transfers := conns.transfers.getTotal(); transfers >= int32(Config.MaxTotalConnections) {
  1183. logger.Info(logSender, "", "active transfers %d/%d", transfers, Config.MaxTotalConnections)
  1184. return ErrConnectionDenied
  1185. }
  1186. conns.RLock()
  1187. defer conns.RUnlock()
  1188. if sess := len(conns.connections); sess >= Config.MaxTotalConnections {
  1189. logger.Info(logSender, "", "active client sessions %d/%d", sess, Config.MaxTotalConnections)
  1190. return ErrConnectionDenied
  1191. }
  1192. }
  1193. return nil
  1194. }
  1195. // GetStats returns stats for active connections
  1196. func (conns *ActiveConnections) GetStats(role string) []ConnectionStatus {
  1197. conns.RLock()
  1198. defer conns.RUnlock()
  1199. stats := make([]ConnectionStatus, 0, len(conns.connections))
  1200. node := dataprovider.GetNodeName()
  1201. for _, c := range conns.connections {
  1202. if role == "" || c.GetRole() == role {
  1203. stat := ConnectionStatus{
  1204. Username: c.GetUsername(),
  1205. ConnectionID: c.GetID(),
  1206. ClientVersion: c.GetClientVersion(),
  1207. RemoteAddress: c.GetRemoteAddress(),
  1208. ConnectionTime: util.GetTimeAsMsSinceEpoch(c.GetConnectionTime()),
  1209. LastActivity: util.GetTimeAsMsSinceEpoch(c.GetLastActivity()),
  1210. CurrentTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1211. Protocol: c.GetProtocol(),
  1212. Command: c.GetCommand(),
  1213. Transfers: c.GetTransfers(),
  1214. Node: node,
  1215. }
  1216. stats = append(stats, stat)
  1217. }
  1218. }
  1219. return stats
  1220. }
  1221. // ConnectionStatus returns the status for an active connection
  1222. type ConnectionStatus struct {
  1223. // Logged in username
  1224. Username string `json:"username"`
  1225. // Unique identifier for the connection
  1226. ConnectionID string `json:"connection_id"`
  1227. // client's version string
  1228. ClientVersion string `json:"client_version,omitempty"`
  1229. // Remote address for this connection
  1230. RemoteAddress string `json:"remote_address"`
  1231. // Connection time as unix timestamp in milliseconds
  1232. ConnectionTime int64 `json:"connection_time"`
  1233. // Last activity as unix timestamp in milliseconds
  1234. LastActivity int64 `json:"last_activity"`
  1235. // Current time as unix timestamp in milliseconds
  1236. CurrentTime int64 `json:"current_time"`
  1237. // Protocol for this connection
  1238. Protocol string `json:"protocol"`
  1239. // active uploads/downloads
  1240. Transfers []ConnectionTransfer `json:"active_transfers,omitempty"`
  1241. // SSH command or WebDAV method
  1242. Command string `json:"command,omitempty"`
  1243. // Node identifier, omitted for single node installations
  1244. Node string `json:"node,omitempty"`
  1245. }
  1246. // ActiveQuotaScan defines an active quota scan for a user
  1247. type ActiveQuotaScan struct {
  1248. // Username to which the quota scan refers
  1249. Username string `json:"username"`
  1250. // quota scan start time as unix timestamp in milliseconds
  1251. StartTime int64 `json:"start_time"`
  1252. Role string `json:"-"`
  1253. }
  1254. // ActiveVirtualFolderQuotaScan defines an active quota scan for a virtual folder
  1255. type ActiveVirtualFolderQuotaScan struct {
  1256. // folder name to which the quota scan refers
  1257. Name string `json:"name"`
  1258. // quota scan start time as unix timestamp in milliseconds
  1259. StartTime int64 `json:"start_time"`
  1260. }
  1261. // ActiveScans holds the active quota scans
  1262. type ActiveScans struct {
  1263. sync.RWMutex
  1264. UserScans []ActiveQuotaScan
  1265. FolderScans []ActiveVirtualFolderQuotaScan
  1266. }
  1267. // GetUsersQuotaScans returns the active users quota scans
  1268. func (s *ActiveScans) GetUsersQuotaScans(role string) []ActiveQuotaScan {
  1269. s.RLock()
  1270. defer s.RUnlock()
  1271. scans := make([]ActiveQuotaScan, 0, len(s.UserScans))
  1272. for _, scan := range s.UserScans {
  1273. if role == "" || role == scan.Role {
  1274. scans = append(scans, ActiveQuotaScan{
  1275. Username: scan.Username,
  1276. StartTime: scan.StartTime,
  1277. })
  1278. }
  1279. }
  1280. return scans
  1281. }
  1282. // AddUserQuotaScan adds a user to the ones with active quota scans.
  1283. // Returns false if the user has a quota scan already running
  1284. func (s *ActiveScans) AddUserQuotaScan(username, role string) bool {
  1285. s.Lock()
  1286. defer s.Unlock()
  1287. for _, scan := range s.UserScans {
  1288. if scan.Username == username {
  1289. return false
  1290. }
  1291. }
  1292. s.UserScans = append(s.UserScans, ActiveQuotaScan{
  1293. Username: username,
  1294. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1295. Role: role,
  1296. })
  1297. return true
  1298. }
  1299. // RemoveUserQuotaScan removes a user from the ones with active quota scans.
  1300. // Returns false if the user has no active quota scans
  1301. func (s *ActiveScans) RemoveUserQuotaScan(username string) bool {
  1302. s.Lock()
  1303. defer s.Unlock()
  1304. for idx, scan := range s.UserScans {
  1305. if scan.Username == username {
  1306. lastIdx := len(s.UserScans) - 1
  1307. s.UserScans[idx] = s.UserScans[lastIdx]
  1308. s.UserScans = s.UserScans[:lastIdx]
  1309. return true
  1310. }
  1311. }
  1312. return false
  1313. }
  1314. // GetVFoldersQuotaScans returns the active quota scans for virtual folders
  1315. func (s *ActiveScans) GetVFoldersQuotaScans() []ActiveVirtualFolderQuotaScan {
  1316. s.RLock()
  1317. defer s.RUnlock()
  1318. scans := make([]ActiveVirtualFolderQuotaScan, len(s.FolderScans))
  1319. copy(scans, s.FolderScans)
  1320. return scans
  1321. }
  1322. // AddVFolderQuotaScan adds a virtual folder to the ones with active quota scans.
  1323. // Returns false if the folder has a quota scan already running
  1324. func (s *ActiveScans) AddVFolderQuotaScan(folderName string) bool {
  1325. s.Lock()
  1326. defer s.Unlock()
  1327. for _, scan := range s.FolderScans {
  1328. if scan.Name == folderName {
  1329. return false
  1330. }
  1331. }
  1332. s.FolderScans = append(s.FolderScans, ActiveVirtualFolderQuotaScan{
  1333. Name: folderName,
  1334. StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
  1335. })
  1336. return true
  1337. }
  1338. // RemoveVFolderQuotaScan removes a folder from the ones with active quota scans.
  1339. // Returns false if the folder has no active quota scans
  1340. func (s *ActiveScans) RemoveVFolderQuotaScan(folderName string) bool {
  1341. s.Lock()
  1342. defer s.Unlock()
  1343. for idx, scan := range s.FolderScans {
  1344. if scan.Name == folderName {
  1345. lastIdx := len(s.FolderScans) - 1
  1346. s.FolderScans[idx] = s.FolderScans[lastIdx]
  1347. s.FolderScans = s.FolderScans[:lastIdx]
  1348. return true
  1349. }
  1350. }
  1351. return false
  1352. }