eventmanager.go 88 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "mime/multipart"
  25. "net/http"
  26. "net/textproto"
  27. "net/url"
  28. "os"
  29. "os/exec"
  30. "path"
  31. "path/filepath"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "time"
  37. "github.com/bmatcuk/doublestar/v4"
  38. "github.com/klauspost/compress/zip"
  39. "github.com/robfig/cron/v3"
  40. "github.com/rs/xid"
  41. "github.com/sftpgo/sdk"
  42. "github.com/wneessen/go-mail"
  43. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  44. "github.com/drakkan/sftpgo/v2/internal/logger"
  45. "github.com/drakkan/sftpgo/v2/internal/plugin"
  46. "github.com/drakkan/sftpgo/v2/internal/smtp"
  47. "github.com/drakkan/sftpgo/v2/internal/util"
  48. "github.com/drakkan/sftpgo/v2/internal/vfs"
  49. )
  50. const (
  51. ipBlockedEventName = "IP Blocked"
  52. maxAttachmentsSize = int64(10 * 1024 * 1024)
  53. objDataPlaceholder = "{{ObjectData}}"
  54. )
  55. // Supported IDP login events
  56. const (
  57. IDPLoginUser = "IDP login user"
  58. IDPLoginAdmin = "IDP login admin"
  59. )
  60. var (
  61. // eventManager handle the supported event rules actions
  62. eventManager eventRulesContainer
  63. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  64. )
  65. func init() {
  66. eventManager = eventRulesContainer{
  67. schedulesMapping: make(map[string][]cron.EntryID),
  68. // arbitrary maximum number of concurrent asynchronous tasks,
  69. // each task could execute multiple actions
  70. concurrencyGuard: make(chan struct{}, 200),
  71. }
  72. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  73. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  74. eventManager.handleProviderEvent(EventParams{
  75. Name: executor,
  76. ObjectName: objectName,
  77. Event: operation,
  78. Status: 1,
  79. ObjectType: objectType,
  80. IP: ip,
  81. Role: role,
  82. Timestamp: time.Now().UnixNano(),
  83. Object: object,
  84. })
  85. })
  86. }
  87. // HandleCertificateEvent checks and executes action rules for certificate events
  88. func HandleCertificateEvent(params EventParams) {
  89. eventManager.handleCertificateEvent(params)
  90. }
  91. // HandleIDPLoginEvent executes actions defined for a successful login from an Identity Provider
  92. func HandleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User, *dataprovider.Admin, error) {
  93. return eventManager.handleIDPLoginEvent(params, customFields)
  94. }
  95. // eventRulesContainer stores event rules by trigger
  96. type eventRulesContainer struct {
  97. sync.RWMutex
  98. lastLoad atomic.Int64
  99. FsEvents []dataprovider.EventRule
  100. ProviderEvents []dataprovider.EventRule
  101. Schedules []dataprovider.EventRule
  102. IPBlockedEvents []dataprovider.EventRule
  103. CertificateEvents []dataprovider.EventRule
  104. IPDLoginEvents []dataprovider.EventRule
  105. schedulesMapping map[string][]cron.EntryID
  106. concurrencyGuard chan struct{}
  107. }
  108. func (r *eventRulesContainer) addAsyncTask() {
  109. activeHooks.Add(1)
  110. r.concurrencyGuard <- struct{}{}
  111. }
  112. func (r *eventRulesContainer) removeAsyncTask() {
  113. activeHooks.Add(-1)
  114. <-r.concurrencyGuard
  115. }
  116. func (r *eventRulesContainer) getLastLoadTime() int64 {
  117. return r.lastLoad.Load()
  118. }
  119. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  120. r.lastLoad.Store(modTime)
  121. }
  122. // RemoveRule deletes the rule with the specified name
  123. func (r *eventRulesContainer) RemoveRule(name string) {
  124. r.Lock()
  125. defer r.Unlock()
  126. r.removeRuleInternal(name)
  127. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  128. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  129. }
  130. func (r *eventRulesContainer) removeRuleInternal(name string) {
  131. for idx := range r.FsEvents {
  132. if r.FsEvents[idx].Name == name {
  133. lastIdx := len(r.FsEvents) - 1
  134. r.FsEvents[idx] = r.FsEvents[lastIdx]
  135. r.FsEvents = r.FsEvents[:lastIdx]
  136. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  137. return
  138. }
  139. }
  140. for idx := range r.ProviderEvents {
  141. if r.ProviderEvents[idx].Name == name {
  142. lastIdx := len(r.ProviderEvents) - 1
  143. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  144. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  145. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  146. return
  147. }
  148. }
  149. for idx := range r.IPBlockedEvents {
  150. if r.IPBlockedEvents[idx].Name == name {
  151. lastIdx := len(r.IPBlockedEvents) - 1
  152. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  153. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  154. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  155. return
  156. }
  157. }
  158. for idx := range r.CertificateEvents {
  159. if r.CertificateEvents[idx].Name == name {
  160. lastIdx := len(r.CertificateEvents) - 1
  161. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  162. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  163. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  164. return
  165. }
  166. }
  167. for idx := range r.IPDLoginEvents {
  168. if r.IPDLoginEvents[idx].Name == name {
  169. lastIdx := len(r.IPDLoginEvents) - 1
  170. r.IPDLoginEvents[idx] = r.IPDLoginEvents[lastIdx]
  171. r.IPDLoginEvents = r.IPDLoginEvents[:lastIdx]
  172. eventManagerLog(logger.LevelDebug, "removed rule %q from IDP login events", name)
  173. return
  174. }
  175. }
  176. for idx := range r.Schedules {
  177. if r.Schedules[idx].Name == name {
  178. if schedules, ok := r.schedulesMapping[name]; ok {
  179. for _, entryID := range schedules {
  180. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  181. eventScheduler.Remove(entryID)
  182. }
  183. delete(r.schedulesMapping, name)
  184. }
  185. lastIdx := len(r.Schedules) - 1
  186. r.Schedules[idx] = r.Schedules[lastIdx]
  187. r.Schedules = r.Schedules[:lastIdx]
  188. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  189. return
  190. }
  191. }
  192. }
  193. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  194. r.removeRuleInternal(rule.Name)
  195. if rule.DeletedAt > 0 {
  196. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  197. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  198. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  199. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  200. }
  201. return
  202. }
  203. if rule.Status != 1 || rule.Trigger == dataprovider.EventTriggerOnDemand {
  204. return
  205. }
  206. switch rule.Trigger {
  207. case dataprovider.EventTriggerFsEvent:
  208. r.FsEvents = append(r.FsEvents, rule)
  209. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  210. case dataprovider.EventTriggerProviderEvent:
  211. r.ProviderEvents = append(r.ProviderEvents, rule)
  212. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  213. case dataprovider.EventTriggerIPBlocked:
  214. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  215. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  216. case dataprovider.EventTriggerCertificate:
  217. r.CertificateEvents = append(r.CertificateEvents, rule)
  218. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  219. case dataprovider.EventTriggerIDPLogin:
  220. r.IPDLoginEvents = append(r.IPDLoginEvents, rule)
  221. eventManagerLog(logger.LevelDebug, "added rule %q to IDP login events", rule.Name)
  222. case dataprovider.EventTriggerSchedule:
  223. for _, schedule := range rule.Conditions.Schedules {
  224. cronSpec := schedule.GetCronSpec()
  225. job := &eventCronJob{
  226. ruleName: dataprovider.ConvertName(rule.Name),
  227. }
  228. entryID, err := eventScheduler.AddJob(cronSpec, job)
  229. if err != nil {
  230. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  231. return
  232. }
  233. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  234. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  235. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  236. }
  237. r.Schedules = append(r.Schedules, rule)
  238. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  239. default:
  240. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  241. }
  242. }
  243. func (r *eventRulesContainer) loadRules() {
  244. eventManagerLog(logger.LevelDebug, "loading updated rules")
  245. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  246. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  247. if err != nil {
  248. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  249. return
  250. }
  251. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  252. if len(rules) > 0 {
  253. r.Lock()
  254. defer r.Unlock()
  255. for _, rule := range rules {
  256. r.addUpdateRuleInternal(rule)
  257. }
  258. }
  259. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d, IDP login events: %d",
  260. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents), len(r.IPDLoginEvents))
  261. r.setLastLoadTime(modTime)
  262. }
  263. func (*eventRulesContainer) checkIPDLoginEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  264. switch conditions.IDPLoginEvent {
  265. case dataprovider.IDPLoginUser:
  266. if params.Event != IDPLoginUser {
  267. return false
  268. }
  269. case dataprovider.IDPLoginAdmin:
  270. if params.Event != IDPLoginAdmin {
  271. return false
  272. }
  273. }
  274. return checkEventConditionPatterns(params.Name, conditions.Options.Names)
  275. }
  276. func (*eventRulesContainer) checkProviderEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  277. if !util.Contains(conditions.ProviderEvents, params.Event) {
  278. return false
  279. }
  280. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  281. return false
  282. }
  283. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  284. return false
  285. }
  286. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  287. return false
  288. }
  289. return true
  290. }
  291. func (*eventRulesContainer) checkFsEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  292. if !util.Contains(conditions.FsEvents, params.Event) {
  293. return false
  294. }
  295. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  296. return false
  297. }
  298. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  299. return false
  300. }
  301. if !checkEventGroupConditionPatters(params.Groups, conditions.Options.GroupNames) {
  302. return false
  303. }
  304. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  305. return false
  306. }
  307. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  308. return false
  309. }
  310. if params.Event == operationUpload || params.Event == operationDownload {
  311. if conditions.Options.MinFileSize > 0 {
  312. if params.FileSize < conditions.Options.MinFileSize {
  313. return false
  314. }
  315. }
  316. if conditions.Options.MaxFileSize > 0 {
  317. if params.FileSize > conditions.Options.MaxFileSize {
  318. return false
  319. }
  320. }
  321. }
  322. return true
  323. }
  324. // hasFsRules returns true if there are any rules for filesystem event triggers
  325. func (r *eventRulesContainer) hasFsRules() bool {
  326. r.RLock()
  327. defer r.RUnlock()
  328. return len(r.FsEvents) > 0
  329. }
  330. // handleFsEvent executes the rules actions defined for the specified event.
  331. // The boolean parameter indicates whether a sync action was executed
  332. func (r *eventRulesContainer) handleFsEvent(params EventParams) (bool, error) {
  333. if params.Protocol == protocolEventAction {
  334. return false, nil
  335. }
  336. r.RLock()
  337. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  338. for _, rule := range r.FsEvents {
  339. if r.checkFsEventMatch(&rule.Conditions, &params) {
  340. if err := rule.CheckActionsConsistency(""); err != nil {
  341. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  342. rule.Name, err, params.Event)
  343. continue
  344. }
  345. hasSyncActions := false
  346. for _, action := range rule.Actions {
  347. if action.Options.ExecuteSync {
  348. hasSyncActions = true
  349. break
  350. }
  351. }
  352. if hasSyncActions {
  353. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  354. } else {
  355. rulesAsync = append(rulesAsync, rule)
  356. }
  357. }
  358. }
  359. r.RUnlock()
  360. params.sender = params.Name
  361. if len(rulesAsync) > 0 {
  362. go executeAsyncRulesActions(rulesAsync, params)
  363. }
  364. if len(rulesWithSyncActions) > 0 {
  365. return true, executeSyncRulesActions(rulesWithSyncActions, params)
  366. }
  367. return false, nil
  368. }
  369. func (r *eventRulesContainer) handleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User,
  370. *dataprovider.Admin, error,
  371. ) {
  372. r.RLock()
  373. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  374. for _, rule := range r.IPDLoginEvents {
  375. if r.checkIPDLoginEventMatch(&rule.Conditions, &params) {
  376. if err := rule.CheckActionsConsistency(""); err != nil {
  377. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  378. rule.Name, err, params.Event)
  379. continue
  380. }
  381. hasSyncActions := false
  382. for _, action := range rule.Actions {
  383. if action.Options.ExecuteSync {
  384. hasSyncActions = true
  385. break
  386. }
  387. }
  388. if hasSyncActions {
  389. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  390. } else {
  391. rulesAsync = append(rulesAsync, rule)
  392. }
  393. }
  394. }
  395. r.RUnlock()
  396. if len(rulesAsync) == 0 && len(rulesWithSyncActions) == 0 {
  397. return nil, nil, nil
  398. }
  399. params.addIDPCustomFields(customFields)
  400. if len(rulesWithSyncActions) > 1 {
  401. var ruleNames []string
  402. for _, r := range rulesWithSyncActions {
  403. ruleNames = append(ruleNames, r.Name)
  404. }
  405. return nil, nil, fmt.Errorf("more than one account check action rules matches: %q", strings.Join(ruleNames, ","))
  406. }
  407. if len(rulesAsync) > 0 {
  408. go executeAsyncRulesActions(rulesAsync, params)
  409. }
  410. if len(rulesWithSyncActions) > 0 {
  411. return executeIDPAccountCheckRule(rulesWithSyncActions[0], params)
  412. }
  413. return nil, nil, nil
  414. }
  415. // username is populated for user objects
  416. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  417. r.RLock()
  418. defer r.RUnlock()
  419. var rules []dataprovider.EventRule
  420. for _, rule := range r.ProviderEvents {
  421. if r.checkProviderEventMatch(&rule.Conditions, &params) {
  422. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  423. rules = append(rules, rule)
  424. } else {
  425. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  426. rule.Name, err, params.Event, params.ObjectType)
  427. }
  428. }
  429. }
  430. if len(rules) > 0 {
  431. params.sender = params.ObjectName
  432. go executeAsyncRulesActions(rules, params)
  433. }
  434. }
  435. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  436. r.RLock()
  437. defer r.RUnlock()
  438. if len(r.IPBlockedEvents) == 0 {
  439. return
  440. }
  441. var rules []dataprovider.EventRule
  442. for _, rule := range r.IPBlockedEvents {
  443. if err := rule.CheckActionsConsistency(""); err == nil {
  444. rules = append(rules, rule)
  445. } else {
  446. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  447. rule.Name, err, params.Event)
  448. }
  449. }
  450. if len(rules) > 0 {
  451. go executeAsyncRulesActions(rules, params)
  452. }
  453. }
  454. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  455. r.RLock()
  456. defer r.RUnlock()
  457. if len(r.CertificateEvents) == 0 {
  458. return
  459. }
  460. var rules []dataprovider.EventRule
  461. for _, rule := range r.CertificateEvents {
  462. if err := rule.CheckActionsConsistency(""); err == nil {
  463. rules = append(rules, rule)
  464. } else {
  465. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  466. rule.Name, err, params.Event)
  467. }
  468. }
  469. if len(rules) > 0 {
  470. go executeAsyncRulesActions(rules, params)
  471. }
  472. }
  473. type executedRetentionCheck struct {
  474. Username string
  475. ActionName string
  476. Results []folderRetentionCheckResult
  477. }
  478. // EventParams defines the supported event parameters
  479. type EventParams struct {
  480. Name string
  481. Groups []sdk.GroupMapping
  482. Event string
  483. Status int
  484. VirtualPath string
  485. FsPath string
  486. VirtualTargetPath string
  487. FsTargetPath string
  488. ObjectName string
  489. ObjectType string
  490. FileSize int64
  491. Elapsed int64
  492. Protocol string
  493. IP string
  494. Role string
  495. Timestamp int64
  496. IDPCustomFields *map[string]string
  497. Object plugin.Renderer
  498. sender string
  499. updateStatusFromError bool
  500. errors []string
  501. retentionChecks []executedRetentionCheck
  502. }
  503. func (p *EventParams) getACopy() *EventParams {
  504. params := *p
  505. params.errors = make([]string, len(p.errors))
  506. copy(params.errors, p.errors)
  507. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  508. for _, c := range p.retentionChecks {
  509. executedCheck := executedRetentionCheck{
  510. Username: c.Username,
  511. ActionName: c.ActionName,
  512. }
  513. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  514. copy(executedCheck.Results, c.Results)
  515. retentionChecks = append(retentionChecks, executedCheck)
  516. }
  517. params.retentionChecks = retentionChecks
  518. if p.IDPCustomFields != nil {
  519. fields := make(map[string]string)
  520. for k, v := range *p.IDPCustomFields {
  521. fields[k] = v
  522. }
  523. params.IDPCustomFields = &fields
  524. }
  525. return &params
  526. }
  527. func (p *EventParams) addIDPCustomFields(customFields *map[string]any) {
  528. if customFields == nil {
  529. return
  530. }
  531. fields := make(map[string]string)
  532. for k, v := range *customFields {
  533. switch val := v.(type) {
  534. case string:
  535. fields[k] = val
  536. }
  537. }
  538. p.IDPCustomFields = &fields
  539. }
  540. // AddError adds a new error to the event params and update the status if needed
  541. func (p *EventParams) AddError(err error) {
  542. if err == nil {
  543. return
  544. }
  545. if p.updateStatusFromError && p.Status == 1 {
  546. p.Status = 2
  547. }
  548. p.errors = append(p.errors, err.Error())
  549. }
  550. func (p *EventParams) setBackupParams(backupPath string) {
  551. if p.sender != "" {
  552. return
  553. }
  554. p.sender = dataprovider.ActionExecutorSystem
  555. p.FsPath = backupPath
  556. p.ObjectName = filepath.Base(backupPath)
  557. p.VirtualPath = "/" + p.ObjectName
  558. p.Timestamp = time.Now().UnixNano()
  559. info, err := os.Stat(backupPath)
  560. if err == nil {
  561. p.FileSize = info.Size()
  562. }
  563. }
  564. func (p *EventParams) getStatusString() string {
  565. switch p.Status {
  566. case 1:
  567. return "OK"
  568. default:
  569. return "KO"
  570. }
  571. }
  572. // getUsers returns users with group settings not applied
  573. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  574. if p.sender == "" {
  575. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeUsers})
  576. if err != nil {
  577. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  578. return nil, errors.New("unable to get users")
  579. }
  580. return dump.Users, nil
  581. }
  582. user, err := p.getUserFromSender()
  583. if err != nil {
  584. return nil, err
  585. }
  586. return []dataprovider.User{user}, nil
  587. }
  588. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  589. if p.sender == dataprovider.ActionExecutorSystem {
  590. return dataprovider.User{
  591. BaseUser: sdk.BaseUser{
  592. Status: 1,
  593. Username: p.sender,
  594. HomeDir: dataprovider.GetBackupsPath(),
  595. Permissions: map[string][]string{
  596. "/": {dataprovider.PermAny},
  597. },
  598. },
  599. }, nil
  600. }
  601. user, err := dataprovider.UserExists(p.sender, "")
  602. if err != nil {
  603. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  604. return user, fmt.Errorf("error getting user %q", p.sender)
  605. }
  606. return user, nil
  607. }
  608. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  609. if p.sender == "" {
  610. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeFolders})
  611. return dump.Folders, err
  612. }
  613. folder, err := dataprovider.GetFolderByName(p.sender)
  614. if err != nil {
  615. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  616. }
  617. return []vfs.BaseVirtualFolder{folder}, nil
  618. }
  619. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  620. if len(p.retentionChecks) == 0 {
  621. return nil, errors.New("no data retention report available")
  622. }
  623. var b bytes.Buffer
  624. if _, err := p.writeCompressedDataRetentionReports(&b); err != nil {
  625. return nil, err
  626. }
  627. return b.Bytes(), nil
  628. }
  629. func (p *EventParams) writeCompressedDataRetentionReports(w io.Writer) (int64, error) {
  630. var n int64
  631. wr := zip.NewWriter(w)
  632. for _, check := range p.retentionChecks {
  633. data, err := getCSVRetentionReport(check.Results)
  634. if err != nil {
  635. return n, fmt.Errorf("unable to get CSV report: %w", err)
  636. }
  637. dataSize := int64(len(data))
  638. n += dataSize
  639. // we suppose a 3:1 compression ratio
  640. if n > (maxAttachmentsSize * 3) {
  641. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s",
  642. util.ByteCountIEC(n))
  643. return n, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(n))
  644. }
  645. fh := &zip.FileHeader{
  646. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  647. Method: zip.Deflate,
  648. Modified: time.Now().UTC(),
  649. }
  650. f, err := wr.CreateHeader(fh)
  651. if err != nil {
  652. return n, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  653. }
  654. _, err = io.CopyN(f, bytes.NewBuffer(data), dataSize)
  655. if err != nil {
  656. return n, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  657. }
  658. }
  659. if err := wr.Close(); err != nil {
  660. return n, fmt.Errorf("unable to close zip writer: %w", err)
  661. }
  662. return n, nil
  663. }
  664. func (p *EventParams) getRetentionReportsAsMailAttachment() (*mail.File, error) {
  665. if len(p.retentionChecks) == 0 {
  666. return nil, errors.New("no data retention report available")
  667. }
  668. return &mail.File{
  669. Name: "retention-reports.zip",
  670. Header: make(map[string][]string),
  671. Writer: p.writeCompressedDataRetentionReports,
  672. }, nil
  673. }
  674. func (*EventParams) getStringReplacement(val string, jsonEscaped bool) string {
  675. if jsonEscaped {
  676. return util.JSONEscape(val)
  677. }
  678. return val
  679. }
  680. func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []string {
  681. replacements := []string{
  682. "{{Name}}", p.getStringReplacement(p.Name, jsonEscaped),
  683. "{{Event}}", p.Event,
  684. "{{Status}}", fmt.Sprintf("%d", p.Status),
  685. "{{VirtualPath}}", p.getStringReplacement(p.VirtualPath, jsonEscaped),
  686. "{{FsPath}}", p.getStringReplacement(p.FsPath, jsonEscaped),
  687. "{{VirtualTargetPath}}", p.getStringReplacement(p.VirtualTargetPath, jsonEscaped),
  688. "{{FsTargetPath}}", p.getStringReplacement(p.FsTargetPath, jsonEscaped),
  689. "{{ObjectName}}", p.getStringReplacement(p.ObjectName, jsonEscaped),
  690. "{{ObjectType}}", p.ObjectType,
  691. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  692. "{{Elapsed}}", fmt.Sprintf("%d", p.Elapsed),
  693. "{{Protocol}}", p.Protocol,
  694. "{{IP}}", p.IP,
  695. "{{Role}}", p.getStringReplacement(p.Role, jsonEscaped),
  696. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  697. "{{StatusString}}", p.getStatusString(),
  698. }
  699. if p.VirtualPath != "" {
  700. replacements = append(replacements, "{{VirtualDirPath}}", p.getStringReplacement(path.Dir(p.VirtualPath), jsonEscaped))
  701. }
  702. if p.VirtualTargetPath != "" {
  703. replacements = append(replacements, "{{VirtualTargetDirPath}}", p.getStringReplacement(path.Dir(p.VirtualTargetPath), jsonEscaped))
  704. replacements = append(replacements, "{{TargetName}}", p.getStringReplacement(path.Base(p.VirtualTargetPath), jsonEscaped))
  705. }
  706. if len(p.errors) > 0 {
  707. replacements = append(replacements, "{{ErrorString}}", p.getStringReplacement(strings.Join(p.errors, ", "), jsonEscaped))
  708. } else {
  709. replacements = append(replacements, "{{ErrorString}}", "")
  710. }
  711. replacements = append(replacements, objDataPlaceholder, "")
  712. if addObjectData {
  713. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  714. if err == nil {
  715. replacements[len(replacements)-1] = p.getStringReplacement(string(data), jsonEscaped)
  716. }
  717. }
  718. if p.IDPCustomFields != nil {
  719. for k, v := range *p.IDPCustomFields {
  720. replacements = append(replacements, fmt.Sprintf("{{IDPField%s}}", k), p.getStringReplacement(v, jsonEscaped))
  721. }
  722. }
  723. return replacements
  724. }
  725. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  726. var b bytes.Buffer
  727. csvWriter := csv.NewWriter(&b)
  728. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  729. "elapsed (ms)", "info", "error"})
  730. if err != nil {
  731. return nil, err
  732. }
  733. for _, result := range results {
  734. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  735. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  736. result.Info, result.Error})
  737. if err != nil {
  738. return nil, err
  739. }
  740. }
  741. csvWriter.Flush()
  742. err = csvWriter.Error()
  743. return b.Bytes(), err
  744. }
  745. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSourcePath, virtualTargetPath string,
  746. numFiles int, truncatedSize int64, errTransfer error, operation string, startTime time.Time,
  747. ) error {
  748. var fsDstPath string
  749. var errDstFs error
  750. errWrite := w.Close()
  751. targetPath := virtualSourcePath
  752. if virtualTargetPath != "" {
  753. targetPath = virtualTargetPath
  754. var fsDst vfs.Fs
  755. fsDst, fsDstPath, errDstFs = conn.GetFsAndResolvedPath(virtualTargetPath)
  756. if errTransfer != nil && errDstFs == nil {
  757. // try to remove a partial file on error. If this fails, we can't do anything
  758. errRemove := fsDst.Remove(fsDstPath, false)
  759. conn.Log(logger.LevelDebug, "removing partial file %q after write error, result: %v", virtualTargetPath, errRemove)
  760. }
  761. }
  762. info, err := conn.doStatInternal(targetPath, 0, false, false)
  763. if err == nil {
  764. updateUserQuotaAfterFileWrite(conn, targetPath, numFiles, info.Size()-truncatedSize)
  765. var fsSrcPath string
  766. var errSrcFs error
  767. if virtualSourcePath != "" {
  768. _, fsSrcPath, errSrcFs = conn.GetFsAndResolvedPath(virtualSourcePath)
  769. }
  770. if errSrcFs == nil && errDstFs == nil {
  771. elapsed := time.Since(startTime).Nanoseconds() / 1000000
  772. if errTransfer == nil {
  773. errTransfer = errWrite
  774. }
  775. if operation == operationCopy {
  776. logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
  777. "", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed)
  778. }
  779. ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed) //nolint:errcheck
  780. }
  781. } else {
  782. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
  783. }
  784. if errTransfer != nil {
  785. return errTransfer
  786. }
  787. return errWrite
  788. }
  789. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  790. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  791. if err != nil {
  792. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  793. return
  794. }
  795. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  796. if vfolder.IsIncludedInUserQuota() {
  797. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  798. }
  799. }
  800. func checkWriterPermsAndQuota(conn *BaseConnection, virtualPath string, numFiles int, expectedSize, truncatedSize int64) error {
  801. if numFiles == 0 {
  802. if !conn.User.HasPerm(dataprovider.PermOverwrite, path.Dir(virtualPath)) {
  803. return conn.GetPermissionDeniedError()
  804. }
  805. } else {
  806. if !conn.User.HasPerm(dataprovider.PermUpload, path.Dir(virtualPath)) {
  807. return conn.GetPermissionDeniedError()
  808. }
  809. }
  810. q, _ := conn.HasSpace(numFiles > 0, false, virtualPath)
  811. if !q.HasSpace {
  812. return conn.GetQuotaExceededError()
  813. }
  814. if expectedSize != -1 {
  815. sizeDiff := expectedSize - truncatedSize
  816. if sizeDiff > 0 {
  817. remainingSize := q.GetRemainingSize()
  818. if remainingSize > 0 && remainingSize < sizeDiff {
  819. return conn.GetQuotaExceededError()
  820. }
  821. }
  822. }
  823. return nil
  824. }
  825. func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64) (io.WriteCloser, int, int64, func(), error) {
  826. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  827. if err != nil {
  828. return nil, 0, 0, nil, err
  829. }
  830. var truncatedSize, fileSize int64
  831. numFiles := 1
  832. isFileOverwrite := false
  833. info, err := fs.Lstat(fsPath)
  834. if err == nil {
  835. fileSize = info.Size()
  836. if info.IsDir() {
  837. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  838. }
  839. if info.Mode().IsRegular() {
  840. isFileOverwrite = true
  841. truncatedSize = fileSize
  842. }
  843. numFiles = 0
  844. }
  845. if err != nil && !fs.IsNotExist(err) {
  846. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  847. }
  848. if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
  849. return nil, numFiles, truncatedSize, nil, err
  850. }
  851. f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1))
  852. if err != nil {
  853. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  854. }
  855. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  856. if isFileOverwrite {
  857. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  858. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  859. truncatedSize = 0
  860. }
  861. }
  862. if cancelFn == nil {
  863. cancelFn = func() {}
  864. }
  865. if f != nil {
  866. return f, numFiles, truncatedSize, cancelFn, nil
  867. }
  868. return w, numFiles, truncatedSize, cancelFn, nil
  869. }
  870. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string) error {
  871. if entryPath == wr.Name {
  872. // skip the archive itself
  873. return nil
  874. }
  875. info, err := conn.DoStat(entryPath, 1, false)
  876. if err != nil {
  877. eventManagerLog(logger.LevelError, "unable to add zip entry %q, stat error: %v", entryPath, err)
  878. return err
  879. }
  880. entryName, err := getZipEntryName(entryPath, baseDir)
  881. if err != nil {
  882. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  883. return err
  884. }
  885. if _, ok := wr.Entries[entryName]; ok {
  886. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  887. return nil
  888. }
  889. wr.Entries[entryName] = true
  890. if info.IsDir() {
  891. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  892. Name: entryName + "/",
  893. Method: zip.Deflate,
  894. Modified: info.ModTime(),
  895. })
  896. if err != nil {
  897. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  898. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  899. }
  900. contents, err := conn.ListDir(entryPath)
  901. if err != nil {
  902. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  903. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  904. }
  905. for _, info := range contents {
  906. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  907. if err := addZipEntry(wr, conn, fullPath, baseDir); err != nil {
  908. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  909. return err
  910. }
  911. }
  912. return nil
  913. }
  914. if !info.Mode().IsRegular() {
  915. // we only allow regular files
  916. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  917. return nil
  918. }
  919. reader, cancelFn, err := getFileReader(conn, entryPath)
  920. if err != nil {
  921. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  922. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  923. }
  924. defer cancelFn()
  925. defer reader.Close()
  926. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  927. Name: entryName,
  928. Method: zip.Deflate,
  929. Modified: info.ModTime(),
  930. })
  931. if err != nil {
  932. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  933. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  934. }
  935. _, err = io.Copy(f, reader)
  936. return err
  937. }
  938. func getZipEntryName(entryPath, baseDir string) (string, error) {
  939. if !strings.HasPrefix(entryPath, baseDir) {
  940. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  941. }
  942. entryPath = strings.TrimPrefix(entryPath, baseDir)
  943. return strings.TrimPrefix(entryPath, "/"), nil
  944. }
  945. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  946. if !conn.User.HasPerm(dataprovider.PermDownload, path.Dir(virtualPath)) {
  947. return nil, nil, conn.GetPermissionDeniedError()
  948. }
  949. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  950. if err != nil {
  951. return nil, nil, err
  952. }
  953. f, r, cancelFn, err := fs.Open(fsPath, 0)
  954. if err != nil {
  955. return nil, nil, conn.GetFsError(fs, err)
  956. }
  957. if cancelFn == nil {
  958. cancelFn = func() {}
  959. }
  960. if f != nil {
  961. return f, cancelFn, nil
  962. }
  963. return r, cancelFn, nil
  964. }
  965. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  966. reader, cancelFn, err := getFileReader(conn, virtualPath)
  967. if err != nil {
  968. return err
  969. }
  970. defer cancelFn()
  971. defer reader.Close()
  972. _, err = io.Copy(w, reader)
  973. return err
  974. }
  975. func getFileContentFn(conn *BaseConnection, virtualPath string, size int64) func(w io.Writer) (int64, error) {
  976. return func(w io.Writer) (int64, error) {
  977. reader, cancelFn, err := getFileReader(conn, virtualPath)
  978. if err != nil {
  979. return 0, err
  980. }
  981. defer cancelFn()
  982. defer reader.Close()
  983. return io.CopyN(w, reader, size)
  984. }
  985. }
  986. func getMailAttachments(conn *BaseConnection, attachments []string, replacer *strings.Replacer) ([]*mail.File, error) {
  987. var files []*mail.File
  988. totalSize := int64(0)
  989. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  990. info, err := conn.DoStat(virtualPath, 0, false)
  991. if err != nil {
  992. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  993. }
  994. if !info.Mode().IsRegular() {
  995. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  996. }
  997. totalSize += info.Size()
  998. if totalSize > maxAttachmentsSize {
  999. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  1000. }
  1001. files = append(files, &mail.File{
  1002. Name: path.Base(virtualPath),
  1003. Header: make(map[string][]string),
  1004. Writer: getFileContentFn(conn, virtualPath, info.Size()),
  1005. })
  1006. }
  1007. return files, nil
  1008. }
  1009. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  1010. if !strings.Contains(input, "{{") {
  1011. return input
  1012. }
  1013. return replacer.Replace(input)
  1014. }
  1015. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  1016. var matched bool
  1017. var err error
  1018. if strings.Contains(p.Pattern, "**") {
  1019. matched, err = doublestar.Match(p.Pattern, name)
  1020. } else {
  1021. matched, err = path.Match(p.Pattern, name)
  1022. }
  1023. if err != nil {
  1024. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  1025. return false
  1026. }
  1027. if p.InverseMatch {
  1028. return !matched
  1029. }
  1030. return matched
  1031. }
  1032. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  1033. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1034. return false
  1035. }
  1036. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  1037. return false
  1038. }
  1039. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1040. return false
  1041. }
  1042. return true
  1043. }
  1044. // checkConditionPatterns returns false if patterns are defined and no match is found
  1045. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  1046. if len(patterns) == 0 {
  1047. return true
  1048. }
  1049. for _, p := range patterns {
  1050. if checkEventConditionPattern(p, name) {
  1051. return true
  1052. }
  1053. }
  1054. return false
  1055. }
  1056. func checkEventGroupConditionPatters(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  1057. if len(patterns) == 0 {
  1058. return true
  1059. }
  1060. for _, group := range groups {
  1061. for _, p := range patterns {
  1062. if checkEventConditionPattern(p, group.Name) {
  1063. return true
  1064. }
  1065. }
  1066. }
  1067. return false
  1068. }
  1069. func getHTTPRuleActionEndpoint(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  1070. u, err := url.Parse(c.Endpoint)
  1071. if err != nil {
  1072. return "", fmt.Errorf("invalid endpoint: %w", err)
  1073. }
  1074. if strings.Contains(u.Path, "{{") {
  1075. pathComponents := strings.Split(u.Path, "/")
  1076. for idx := range pathComponents {
  1077. part := replaceWithReplacer(pathComponents[idx], replacer)
  1078. if part != pathComponents[idx] {
  1079. pathComponents[idx] = url.PathEscape(part)
  1080. }
  1081. }
  1082. u.Path = ""
  1083. u = u.JoinPath(pathComponents...)
  1084. }
  1085. if len(c.QueryParameters) > 0 {
  1086. q := u.Query()
  1087. for _, keyVal := range c.QueryParameters {
  1088. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1089. }
  1090. u.RawQuery = q.Encode()
  1091. }
  1092. return u.String(), nil
  1093. }
  1094. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  1095. conn *BaseConnection, replacer *strings.Replacer, params *EventParams, addObjectData bool,
  1096. ) error {
  1097. partWriter, err := m.CreatePart(h)
  1098. if err != nil {
  1099. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  1100. return err
  1101. }
  1102. if part.Body != "" {
  1103. cType := h.Get("Content-Type")
  1104. if part.Body != objDataPlaceholder && strings.Contains(strings.ToLower(cType), "application/json") {
  1105. replacements := params.getStringReplacements(addObjectData, true)
  1106. jsonReplacer := strings.NewReplacer(replacements...)
  1107. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, jsonReplacer)))
  1108. } else {
  1109. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, replacer)))
  1110. }
  1111. if err != nil {
  1112. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1113. return err
  1114. }
  1115. return nil
  1116. }
  1117. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  1118. data, err := params.getCompressedDataRetentionReport()
  1119. if err != nil {
  1120. return err
  1121. }
  1122. _, err = partWriter.Write(data)
  1123. if err != nil {
  1124. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1125. return err
  1126. }
  1127. return nil
  1128. }
  1129. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  1130. if err != nil {
  1131. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  1132. return err
  1133. }
  1134. return nil
  1135. }
  1136. func jsonEscapeRuleActionBody(c *dataprovider.EventActionHTTPConfig) bool {
  1137. return c.Body != objDataPlaceholder && c.HasJSONBody()
  1138. }
  1139. func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1140. cancel context.CancelFunc, user dataprovider.User, params *EventParams, addObjectData bool,
  1141. ) (io.Reader, string, error) {
  1142. var body io.Reader
  1143. if c.Method == http.MethodGet {
  1144. return body, "", nil
  1145. }
  1146. if c.Body != "" {
  1147. if c.Body == dataprovider.RetentionReportPlaceHolder {
  1148. data, err := params.getCompressedDataRetentionReport()
  1149. if err != nil {
  1150. return body, "", err
  1151. }
  1152. return bytes.NewBuffer(data), "", nil
  1153. }
  1154. if jsonEscapeRuleActionBody(c) {
  1155. replacements := params.getStringReplacements(addObjectData, true)
  1156. jsonReplacer := strings.NewReplacer(replacements...)
  1157. return bytes.NewBufferString(replaceWithReplacer(c.Body, jsonReplacer)), "", nil
  1158. }
  1159. return bytes.NewBufferString(replaceWithReplacer(c.Body, replacer)), "", nil
  1160. }
  1161. if len(c.Parts) > 0 {
  1162. r, w := io.Pipe()
  1163. m := multipart.NewWriter(w)
  1164. var conn *BaseConnection
  1165. if user.Username != "" {
  1166. var err error
  1167. user, err = getUserForEventAction(user)
  1168. if err != nil {
  1169. return body, "", err
  1170. }
  1171. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1172. err = user.CheckFsRoot(connectionID)
  1173. if err != nil {
  1174. user.CloseFs() //nolint:errcheck
  1175. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  1176. user.Username, err)
  1177. }
  1178. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1179. }
  1180. go func() {
  1181. defer w.Close()
  1182. defer user.CloseFs() //nolint:errcheck
  1183. for _, part := range c.Parts {
  1184. h := make(textproto.MIMEHeader)
  1185. if part.Body != "" {
  1186. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  1187. } else {
  1188. h.Set("Content-Disposition",
  1189. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  1190. multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(part.Filepath))))
  1191. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  1192. if contentType == "" {
  1193. contentType = "application/octet-stream"
  1194. }
  1195. h.Set("Content-Type", contentType)
  1196. }
  1197. for _, keyVal := range part.Headers {
  1198. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1199. }
  1200. if err := writeHTTPPart(m, part, h, conn, replacer, params, addObjectData); err != nil {
  1201. cancel()
  1202. return
  1203. }
  1204. }
  1205. m.Close()
  1206. }()
  1207. return r, m.FormDataContentType(), nil
  1208. }
  1209. return body, "", nil
  1210. }
  1211. func setHTTPReqHeaders(req *http.Request, c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1212. contentType string,
  1213. ) {
  1214. if contentType != "" {
  1215. req.Header.Set("Content-Type", contentType)
  1216. }
  1217. if c.Username != "" || c.Password.GetPayload() != "" {
  1218. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1219. }
  1220. for _, keyVal := range c.Headers {
  1221. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1222. }
  1223. }
  1224. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1225. if err := c.TryDecryptPassword(); err != nil {
  1226. return err
  1227. }
  1228. addObjectData := false
  1229. if params.Object != nil {
  1230. addObjectData = c.HasObjectData()
  1231. }
  1232. replacements := params.getStringReplacements(addObjectData, false)
  1233. replacer := strings.NewReplacer(replacements...)
  1234. endpoint, err := getHTTPRuleActionEndpoint(&c, replacer)
  1235. if err != nil {
  1236. return err
  1237. }
  1238. ctx, cancel := c.GetContext()
  1239. defer cancel()
  1240. var user dataprovider.User
  1241. if c.HasMultipartFiles() {
  1242. user, err = params.getUserFromSender()
  1243. if err != nil {
  1244. return err
  1245. }
  1246. }
  1247. body, contentType, err := getHTTPRuleActionBody(&c, replacer, cancel, user, params, addObjectData)
  1248. if err != nil {
  1249. return err
  1250. }
  1251. if body != nil {
  1252. rc, ok := body.(io.ReadCloser)
  1253. if ok {
  1254. defer rc.Close()
  1255. }
  1256. }
  1257. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1258. if err != nil {
  1259. return err
  1260. }
  1261. setHTTPReqHeaders(req, &c, replacer, contentType)
  1262. client := c.GetHTTPClient()
  1263. defer client.CloseIdleConnections()
  1264. startTime := time.Now()
  1265. resp, err := client.Do(req)
  1266. if err != nil {
  1267. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1268. endpoint, time.Since(startTime), err)
  1269. return fmt.Errorf("error sending HTTP request: %w", err)
  1270. }
  1271. defer resp.Body.Close()
  1272. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1273. endpoint, time.Since(startTime), resp.StatusCode)
  1274. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1275. if rb, err := io.ReadAll(io.LimitReader(resp.Body, 2048)); err == nil {
  1276. eventManagerLog(logger.LevelDebug, "error notification response from endpoint %q: %s", endpoint, string(rb))
  1277. }
  1278. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1279. }
  1280. return nil
  1281. }
  1282. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1283. addObjectData := false
  1284. if params.Object != nil {
  1285. for _, k := range c.EnvVars {
  1286. if strings.Contains(k.Value, objDataPlaceholder) {
  1287. addObjectData = true
  1288. break
  1289. }
  1290. }
  1291. }
  1292. replacements := params.getStringReplacements(addObjectData, false)
  1293. replacer := strings.NewReplacer(replacements...)
  1294. args := make([]string, 0, len(c.Args))
  1295. for _, arg := range c.Args {
  1296. args = append(args, replaceWithReplacer(arg, replacer))
  1297. }
  1298. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1299. defer cancel()
  1300. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1301. cmd.Env = []string{}
  1302. for _, keyVal := range c.EnvVars {
  1303. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1304. }
  1305. startTime := time.Now()
  1306. err := cmd.Run()
  1307. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1308. c.Cmd, time.Since(startTime), err)
  1309. return err
  1310. }
  1311. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1312. addObjectData := false
  1313. if params.Object != nil {
  1314. if strings.Contains(c.Body, objDataPlaceholder) {
  1315. addObjectData = true
  1316. }
  1317. }
  1318. replacements := params.getStringReplacements(addObjectData, false)
  1319. replacer := strings.NewReplacer(replacements...)
  1320. body := replaceWithReplacer(c.Body, replacer)
  1321. subject := replaceWithReplacer(c.Subject, replacer)
  1322. recipients := make([]string, 0, len(c.Recipients))
  1323. for _, recipient := range c.Recipients {
  1324. recipients = append(recipients, replaceWithReplacer(recipient, replacer))
  1325. }
  1326. startTime := time.Now()
  1327. var files []*mail.File
  1328. fileAttachments := make([]string, 0, len(c.Attachments))
  1329. for _, attachment := range c.Attachments {
  1330. if attachment == dataprovider.RetentionReportPlaceHolder {
  1331. f, err := params.getRetentionReportsAsMailAttachment()
  1332. if err != nil {
  1333. return err
  1334. }
  1335. files = append(files, f)
  1336. continue
  1337. }
  1338. fileAttachments = append(fileAttachments, attachment)
  1339. }
  1340. if len(fileAttachments) > 0 {
  1341. user, err := params.getUserFromSender()
  1342. if err != nil {
  1343. return err
  1344. }
  1345. user, err = getUserForEventAction(user)
  1346. if err != nil {
  1347. return err
  1348. }
  1349. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1350. err = user.CheckFsRoot(connectionID)
  1351. defer user.CloseFs() //nolint:errcheck
  1352. if err != nil {
  1353. return fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  1354. }
  1355. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1356. res, err := getMailAttachments(conn, fileAttachments, replacer)
  1357. if err != nil {
  1358. return err
  1359. }
  1360. files = append(files, res...)
  1361. }
  1362. err := smtp.SendEmail(recipients, subject, body, smtp.EmailContentType(c.ContentType), files...)
  1363. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1364. time.Since(startTime), err)
  1365. if err != nil {
  1366. return fmt.Errorf("unable to send email: %w", err)
  1367. }
  1368. return nil
  1369. }
  1370. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1371. err := user.LoadAndApplyGroupSettings()
  1372. if err != nil {
  1373. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1374. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1375. }
  1376. user.UploadDataTransfer = 0
  1377. user.UploadBandwidth = 0
  1378. user.DownloadBandwidth = 0
  1379. user.Filters.DisableFsChecks = false
  1380. user.Filters.FilePatterns = nil
  1381. user.Filters.BandwidthLimits = nil
  1382. user.Filters.DataTransferLimits = nil
  1383. for k := range user.Permissions {
  1384. user.Permissions[k] = []string{dataprovider.PermAny}
  1385. }
  1386. return user, nil
  1387. }
  1388. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1389. results := make([]string, 0, len(paths))
  1390. for _, p := range paths {
  1391. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1392. }
  1393. return util.RemoveDuplicates(results, false)
  1394. }
  1395. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1396. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1397. if err != nil {
  1398. return err
  1399. }
  1400. return conn.RemoveFile(fs, fsPath, item, info)
  1401. }
  1402. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1403. user, err := getUserForEventAction(user)
  1404. if err != nil {
  1405. return err
  1406. }
  1407. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1408. err = user.CheckFsRoot(connectionID)
  1409. defer user.CloseFs() //nolint:errcheck
  1410. if err != nil {
  1411. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1412. }
  1413. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1414. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1415. info, err := conn.DoStat(item, 0, false)
  1416. if err != nil {
  1417. if conn.IsNotExistError(err) {
  1418. continue
  1419. }
  1420. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1421. }
  1422. if info.IsDir() {
  1423. if err = conn.RemoveDir(item); err != nil {
  1424. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1425. }
  1426. } else {
  1427. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1428. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1429. }
  1430. }
  1431. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1432. }
  1433. return nil
  1434. }
  1435. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1436. conditions dataprovider.ConditionOptions, params *EventParams,
  1437. ) error {
  1438. users, err := params.getUsers()
  1439. if err != nil {
  1440. return fmt.Errorf("unable to get users: %w", err)
  1441. }
  1442. var failures []string
  1443. executed := 0
  1444. for _, user := range users {
  1445. // if sender is set, the conditions have already been evaluated
  1446. if params.sender == "" {
  1447. if !checkUserConditionOptions(&user, &conditions) {
  1448. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1449. user.Username)
  1450. continue
  1451. }
  1452. }
  1453. executed++
  1454. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1455. params.AddError(err)
  1456. failures = append(failures, user.Username)
  1457. }
  1458. }
  1459. if len(failures) > 0 {
  1460. return fmt.Errorf("fs delete failed for users: %s", strings.Join(failures, ", "))
  1461. }
  1462. if executed == 0 {
  1463. eventManagerLog(logger.LevelError, "no delete executed")
  1464. return errors.New("no delete executed")
  1465. }
  1466. return nil
  1467. }
  1468. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1469. user, err := getUserForEventAction(user)
  1470. if err != nil {
  1471. return err
  1472. }
  1473. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1474. err = user.CheckFsRoot(connectionID)
  1475. defer user.CloseFs() //nolint:errcheck
  1476. if err != nil {
  1477. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1478. }
  1479. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1480. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1481. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1482. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1483. }
  1484. if err = conn.createDirIfMissing(item); err != nil {
  1485. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1486. }
  1487. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1488. }
  1489. return nil
  1490. }
  1491. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1492. conditions dataprovider.ConditionOptions, params *EventParams,
  1493. ) error {
  1494. users, err := params.getUsers()
  1495. if err != nil {
  1496. return fmt.Errorf("unable to get users: %w", err)
  1497. }
  1498. var failures []string
  1499. executed := 0
  1500. for _, user := range users {
  1501. // if sender is set, the conditions have already been evaluated
  1502. if params.sender == "" {
  1503. if !checkUserConditionOptions(&user, &conditions) {
  1504. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1505. user.Username)
  1506. continue
  1507. }
  1508. }
  1509. executed++
  1510. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1511. failures = append(failures, user.Username)
  1512. }
  1513. }
  1514. if len(failures) > 0 {
  1515. return fmt.Errorf("fs mkdir failed for users: %s", strings.Join(failures, ", "))
  1516. }
  1517. if executed == 0 {
  1518. eventManagerLog(logger.LevelError, "no mkdir executed")
  1519. return errors.New("no mkdir executed")
  1520. }
  1521. return nil
  1522. }
  1523. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1524. user dataprovider.User,
  1525. ) error {
  1526. user, err := getUserForEventAction(user)
  1527. if err != nil {
  1528. return err
  1529. }
  1530. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1531. err = user.CheckFsRoot(connectionID)
  1532. defer user.CloseFs() //nolint:errcheck
  1533. if err != nil {
  1534. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1535. }
  1536. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1537. for _, item := range renames {
  1538. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1539. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1540. if err = conn.renameInternal(source, target, true); err != nil {
  1541. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1542. }
  1543. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1544. }
  1545. return nil
  1546. }
  1547. func executeCopyFsActionForUser(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1548. user dataprovider.User,
  1549. ) error {
  1550. user, err := getUserForEventAction(user)
  1551. if err != nil {
  1552. return err
  1553. }
  1554. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1555. err = user.CheckFsRoot(connectionID)
  1556. defer user.CloseFs() //nolint:errcheck
  1557. if err != nil {
  1558. return fmt.Errorf("copy error, unable to check root fs for user %q: %w", user.Username, err)
  1559. }
  1560. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1561. for _, item := range copy {
  1562. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1563. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1564. if strings.HasSuffix(item.Key, "/") {
  1565. source += "/"
  1566. }
  1567. if strings.HasSuffix(item.Value, "/") {
  1568. target += "/"
  1569. }
  1570. if err = conn.Copy(source, target); err != nil {
  1571. return fmt.Errorf("unable to copy %q->%q, user %q: %w", source, target, user.Username, err)
  1572. }
  1573. eventManagerLog(logger.LevelDebug, "copy %q->%q ok, user %q", source, target, user.Username)
  1574. }
  1575. return nil
  1576. }
  1577. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1578. user dataprovider.User,
  1579. ) error {
  1580. user, err := getUserForEventAction(user)
  1581. if err != nil {
  1582. return err
  1583. }
  1584. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1585. err = user.CheckFsRoot(connectionID)
  1586. defer user.CloseFs() //nolint:errcheck
  1587. if err != nil {
  1588. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1589. }
  1590. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1591. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1592. if _, err = conn.DoStat(item, 0, false); err != nil {
  1593. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1594. }
  1595. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1596. }
  1597. return nil
  1598. }
  1599. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1600. conditions dataprovider.ConditionOptions, params *EventParams,
  1601. ) error {
  1602. users, err := params.getUsers()
  1603. if err != nil {
  1604. return fmt.Errorf("unable to get users: %w", err)
  1605. }
  1606. var failures []string
  1607. executed := 0
  1608. for _, user := range users {
  1609. // if sender is set, the conditions have already been evaluated
  1610. if params.sender == "" {
  1611. if !checkUserConditionOptions(&user, &conditions) {
  1612. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1613. user.Username)
  1614. continue
  1615. }
  1616. }
  1617. executed++
  1618. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1619. failures = append(failures, user.Username)
  1620. params.AddError(err)
  1621. }
  1622. }
  1623. if len(failures) > 0 {
  1624. return fmt.Errorf("fs rename failed for users: %s", strings.Join(failures, ", "))
  1625. }
  1626. if executed == 0 {
  1627. eventManagerLog(logger.LevelError, "no rename executed")
  1628. return errors.New("no rename executed")
  1629. }
  1630. return nil
  1631. }
  1632. func executeCopyFsRuleAction(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1633. conditions dataprovider.ConditionOptions, params *EventParams,
  1634. ) error {
  1635. users, err := params.getUsers()
  1636. if err != nil {
  1637. return fmt.Errorf("unable to get users: %w", err)
  1638. }
  1639. var failures []string
  1640. var executed int
  1641. for _, user := range users {
  1642. // if sender is set, the conditions have already been evaluated
  1643. if params.sender == "" {
  1644. if !checkUserConditionOptions(&user, &conditions) {
  1645. eventManagerLog(logger.LevelDebug, "skipping fs copy for user %s, condition options don't match",
  1646. user.Username)
  1647. continue
  1648. }
  1649. }
  1650. executed++
  1651. if err = executeCopyFsActionForUser(copy, replacer, user); err != nil {
  1652. failures = append(failures, user.Username)
  1653. params.AddError(err)
  1654. }
  1655. }
  1656. if len(failures) > 0 {
  1657. return fmt.Errorf("fs copy failed for users: %s", strings.Join(failures, ", "))
  1658. }
  1659. if executed == 0 {
  1660. eventManagerLog(logger.LevelError, "no copy executed")
  1661. return errors.New("no copy executed")
  1662. }
  1663. return nil
  1664. }
  1665. func getArchiveBaseDir(paths []string) string {
  1666. var parentDirs []string
  1667. for _, p := range paths {
  1668. parentDirs = append(parentDirs, path.Dir(p))
  1669. }
  1670. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1671. baseDir := "/"
  1672. if len(parentDirs) == 1 {
  1673. baseDir = parentDirs[0]
  1674. }
  1675. return baseDir
  1676. }
  1677. func getSizeForPath(conn *BaseConnection, p string, info os.FileInfo) (int64, error) {
  1678. if info.IsDir() {
  1679. var dirSize int64
  1680. entries, err := conn.ListDir(p)
  1681. if err != nil {
  1682. return 0, err
  1683. }
  1684. for _, entry := range entries {
  1685. size, err := getSizeForPath(conn, path.Join(p, entry.Name()), entry)
  1686. if err != nil {
  1687. return 0, err
  1688. }
  1689. dirSize += size
  1690. }
  1691. return dirSize, nil
  1692. }
  1693. if info.Mode().IsRegular() {
  1694. return info.Size(), nil
  1695. }
  1696. return 0, nil
  1697. }
  1698. func estimateZipSize(conn *BaseConnection, zipPath string, paths []string) (int64, error) {
  1699. q, _ := conn.HasSpace(false, false, zipPath)
  1700. if q.HasSpace && q.GetRemainingSize() > 0 {
  1701. var size int64
  1702. for _, item := range paths {
  1703. info, err := conn.DoStat(item, 1, false)
  1704. if err != nil {
  1705. return size, err
  1706. }
  1707. itemSize, err := getSizeForPath(conn, item, info)
  1708. if err != nil {
  1709. return size, err
  1710. }
  1711. size += itemSize
  1712. }
  1713. eventManagerLog(logger.LevelDebug, "archive paths %v, archive name %q, size: %d", paths, zipPath, size)
  1714. // we assume the zip size will be half of the real size
  1715. return size / 2, nil
  1716. }
  1717. return -1, nil
  1718. }
  1719. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1720. user dataprovider.User,
  1721. ) error {
  1722. user, err := getUserForEventAction(user)
  1723. if err != nil {
  1724. return err
  1725. }
  1726. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1727. err = user.CheckFsRoot(connectionID)
  1728. defer user.CloseFs() //nolint:errcheck
  1729. if err != nil {
  1730. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1731. }
  1732. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1733. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1734. conn.CheckParentDirs(path.Dir(name)) //nolint:errcheck
  1735. paths := make([]string, 0, len(c.Paths))
  1736. for idx := range c.Paths {
  1737. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1738. if p == name {
  1739. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1740. }
  1741. paths = append(paths, p)
  1742. }
  1743. paths = util.RemoveDuplicates(paths, false)
  1744. estimatedSize, err := estimateZipSize(conn, name, paths)
  1745. if err != nil {
  1746. eventManagerLog(logger.LevelError, "unable to estimate size for archive %q: %v", name, err)
  1747. return fmt.Errorf("unable to estimate archive size: %w", err)
  1748. }
  1749. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name, estimatedSize)
  1750. if err != nil {
  1751. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1752. return fmt.Errorf("unable to create archive: %w", err)
  1753. }
  1754. defer cancelFn()
  1755. baseDir := getArchiveBaseDir(paths)
  1756. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1757. zipWriter := &zipWriterWrapper{
  1758. Name: name,
  1759. Writer: zip.NewWriter(writer),
  1760. Entries: make(map[string]bool),
  1761. }
  1762. startTime := time.Now()
  1763. for _, item := range paths {
  1764. if err := addZipEntry(zipWriter, conn, item, baseDir); err != nil {
  1765. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1766. return err
  1767. }
  1768. }
  1769. if err := zipWriter.Writer.Close(); err != nil {
  1770. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1771. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1772. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1773. }
  1774. return closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime)
  1775. }
  1776. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1777. params *EventParams,
  1778. ) error {
  1779. users, err := params.getUsers()
  1780. if err != nil {
  1781. return fmt.Errorf("unable to get users: %w", err)
  1782. }
  1783. var failures []string
  1784. executed := 0
  1785. for _, user := range users {
  1786. // if sender is set, the conditions have already been evaluated
  1787. if params.sender == "" {
  1788. if !checkUserConditionOptions(&user, &conditions) {
  1789. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1790. user.Username)
  1791. continue
  1792. }
  1793. }
  1794. executed++
  1795. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1796. failures = append(failures, user.Username)
  1797. params.AddError(err)
  1798. }
  1799. }
  1800. if len(failures) > 0 {
  1801. return fmt.Errorf("fs existence check failed for users: %s", strings.Join(failures, ", "))
  1802. }
  1803. if executed == 0 {
  1804. eventManagerLog(logger.LevelError, "no existence check executed")
  1805. return errors.New("no existence check executed")
  1806. }
  1807. return nil
  1808. }
  1809. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1810. conditions dataprovider.ConditionOptions, params *EventParams,
  1811. ) error {
  1812. users, err := params.getUsers()
  1813. if err != nil {
  1814. return fmt.Errorf("unable to get users: %w", err)
  1815. }
  1816. var failures []string
  1817. executed := 0
  1818. for _, user := range users {
  1819. // if sender is set, the conditions have already been evaluated
  1820. if params.sender == "" {
  1821. if !checkUserConditionOptions(&user, &conditions) {
  1822. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1823. user.Username)
  1824. continue
  1825. }
  1826. }
  1827. executed++
  1828. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1829. failures = append(failures, user.Username)
  1830. params.AddError(err)
  1831. }
  1832. }
  1833. if len(failures) > 0 {
  1834. return fmt.Errorf("fs compress failed for users: %s", strings.Join(failures, ","))
  1835. }
  1836. if executed == 0 {
  1837. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1838. return errors.New("no file/folder compressed")
  1839. }
  1840. return nil
  1841. }
  1842. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1843. params *EventParams,
  1844. ) error {
  1845. addObjectData := false
  1846. replacements := params.getStringReplacements(addObjectData, false)
  1847. replacer := strings.NewReplacer(replacements...)
  1848. switch c.Type {
  1849. case dataprovider.FilesystemActionRename:
  1850. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1851. case dataprovider.FilesystemActionDelete:
  1852. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1853. case dataprovider.FilesystemActionMkdirs:
  1854. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1855. case dataprovider.FilesystemActionExist:
  1856. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1857. case dataprovider.FilesystemActionCompress:
  1858. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1859. case dataprovider.FilesystemActionCopy:
  1860. return executeCopyFsRuleAction(c.Copy, replacer, conditions, params)
  1861. default:
  1862. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1863. }
  1864. }
  1865. func executeQuotaResetForUser(user *dataprovider.User) error {
  1866. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1867. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1868. user.Username, err)
  1869. return err
  1870. }
  1871. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  1872. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1873. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1874. }
  1875. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1876. numFiles, size, err := user.ScanQuota()
  1877. if err != nil {
  1878. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1879. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1880. }
  1881. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  1882. if err != nil {
  1883. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1884. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1885. }
  1886. return nil
  1887. }
  1888. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1889. users, err := params.getUsers()
  1890. if err != nil {
  1891. return fmt.Errorf("unable to get users: %w", err)
  1892. }
  1893. var failures []string
  1894. executed := 0
  1895. for _, user := range users {
  1896. // if sender is set, the conditions have already been evaluated
  1897. if params.sender == "" {
  1898. if !checkUserConditionOptions(&user, &conditions) {
  1899. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  1900. user.Username)
  1901. continue
  1902. }
  1903. }
  1904. executed++
  1905. if err = executeQuotaResetForUser(&user); err != nil {
  1906. params.AddError(err)
  1907. failures = append(failures, user.Username)
  1908. }
  1909. }
  1910. if len(failures) > 0 {
  1911. return fmt.Errorf("quota reset failed for users: %s", strings.Join(failures, ", "))
  1912. }
  1913. if executed == 0 {
  1914. eventManagerLog(logger.LevelError, "no user quota reset executed")
  1915. return errors.New("no user quota reset executed")
  1916. }
  1917. return nil
  1918. }
  1919. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1920. folders, err := params.getFolders()
  1921. if err != nil {
  1922. return fmt.Errorf("unable to get folders: %w", err)
  1923. }
  1924. var failures []string
  1925. executed := 0
  1926. for _, folder := range folders {
  1927. // if sender is set, the conditions have already been evaluated
  1928. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  1929. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  1930. folder.Name)
  1931. continue
  1932. }
  1933. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  1934. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  1935. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  1936. failures = append(failures, folder.Name)
  1937. continue
  1938. }
  1939. executed++
  1940. f := vfs.VirtualFolder{
  1941. BaseVirtualFolder: folder,
  1942. VirtualPath: "/",
  1943. }
  1944. numFiles, size, err := f.ScanQuota()
  1945. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  1946. if err != nil {
  1947. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  1948. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  1949. failures = append(failures, folder.Name)
  1950. continue
  1951. }
  1952. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  1953. if err != nil {
  1954. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  1955. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  1956. failures = append(failures, folder.Name)
  1957. }
  1958. }
  1959. if len(failures) > 0 {
  1960. return fmt.Errorf("quota reset failed for folders: %s", strings.Join(failures, ", "))
  1961. }
  1962. if executed == 0 {
  1963. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  1964. return errors.New("no folder quota reset executed")
  1965. }
  1966. return nil
  1967. }
  1968. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1969. users, err := params.getUsers()
  1970. if err != nil {
  1971. return fmt.Errorf("unable to get users: %w", err)
  1972. }
  1973. var failures []string
  1974. executed := 0
  1975. for _, user := range users {
  1976. // if sender is set, the conditions have already been evaluated
  1977. if params.sender == "" {
  1978. if !checkUserConditionOptions(&user, &conditions) {
  1979. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  1980. user.Username)
  1981. continue
  1982. }
  1983. }
  1984. executed++
  1985. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  1986. if err != nil {
  1987. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  1988. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  1989. failures = append(failures, user.Username)
  1990. }
  1991. }
  1992. if len(failures) > 0 {
  1993. return fmt.Errorf("transfer quota reset failed for users: %s", strings.Join(failures, ", "))
  1994. }
  1995. if executed == 0 {
  1996. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  1997. return errors.New("no transfer quota reset executed")
  1998. }
  1999. return nil
  2000. }
  2001. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  2002. params *EventParams, actionName string,
  2003. ) error {
  2004. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2005. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  2006. user.Username, err)
  2007. return err
  2008. }
  2009. check := RetentionCheck{
  2010. Folders: folders,
  2011. }
  2012. c := RetentionChecks.Add(check, &user)
  2013. if c == nil {
  2014. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  2015. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  2016. }
  2017. defer func() {
  2018. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  2019. Username: user.Username,
  2020. ActionName: actionName,
  2021. Results: c.results,
  2022. })
  2023. }()
  2024. if err := c.Start(); err != nil {
  2025. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  2026. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  2027. }
  2028. return nil
  2029. }
  2030. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  2031. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  2032. ) error {
  2033. users, err := params.getUsers()
  2034. if err != nil {
  2035. return fmt.Errorf("unable to get users: %w", err)
  2036. }
  2037. var failures []string
  2038. executed := 0
  2039. for _, user := range users {
  2040. // if sender is set, the conditions have already been evaluated
  2041. if params.sender == "" {
  2042. if !checkUserConditionOptions(&user, &conditions) {
  2043. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  2044. user.Username)
  2045. continue
  2046. }
  2047. }
  2048. executed++
  2049. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  2050. failures = append(failures, user.Username)
  2051. params.AddError(err)
  2052. }
  2053. }
  2054. if len(failures) > 0 {
  2055. return fmt.Errorf("retention check failed for users: %s", strings.Join(failures, ", "))
  2056. }
  2057. if executed == 0 {
  2058. eventManagerLog(logger.LevelError, "no retention check executed")
  2059. return errors.New("no retention check executed")
  2060. }
  2061. return nil
  2062. }
  2063. func executeUserExpirationCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2064. users, err := params.getUsers()
  2065. if err != nil {
  2066. return fmt.Errorf("unable to get users: %w", err)
  2067. }
  2068. var failures []string
  2069. var executed int
  2070. for _, user := range users {
  2071. // if sender is set, the conditions have already been evaluated
  2072. if params.sender == "" {
  2073. if !checkUserConditionOptions(&user, &conditions) {
  2074. eventManagerLog(logger.LevelDebug, "skipping expiration check for user %q, condition options don't match",
  2075. user.Username)
  2076. continue
  2077. }
  2078. }
  2079. executed++
  2080. if user.ExpirationDate > 0 {
  2081. expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate)
  2082. if expDate.Before(time.Now()) {
  2083. failures = append(failures, user.Username)
  2084. }
  2085. }
  2086. }
  2087. if len(failures) > 0 {
  2088. return fmt.Errorf("expired users: %s", strings.Join(failures, ", "))
  2089. }
  2090. if executed == 0 {
  2091. eventManagerLog(logger.LevelError, "no user expiration check executed")
  2092. return errors.New("no user expiration check executed")
  2093. }
  2094. return nil
  2095. }
  2096. func executeMetadataCheckForUser(user *dataprovider.User) error {
  2097. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2098. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  2099. user.Username, err)
  2100. return err
  2101. }
  2102. if !ActiveMetadataChecks.Add(user.Username, user.Role) {
  2103. eventManagerLog(logger.LevelError, "another metadata check is already in progress for user %q", user.Username)
  2104. return fmt.Errorf("another metadata check is in progress for user %q", user.Username)
  2105. }
  2106. defer ActiveMetadataChecks.Remove(user.Username)
  2107. if err := user.CheckMetadataConsistency(); err != nil {
  2108. eventManagerLog(logger.LevelError, "error checking metadata consistence for user %q: %v", user.Username, err)
  2109. return fmt.Errorf("error checking metadata consistence for user %q: %w", user.Username, err)
  2110. }
  2111. return nil
  2112. }
  2113. func executeMetadataCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2114. users, err := params.getUsers()
  2115. if err != nil {
  2116. return fmt.Errorf("unable to get users: %w", err)
  2117. }
  2118. var failures []string
  2119. var executed int
  2120. for _, user := range users {
  2121. // if sender is set, the conditions have already been evaluated
  2122. if params.sender == "" {
  2123. if !checkUserConditionOptions(&user, &conditions) {
  2124. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, condition options don't match",
  2125. user.Username)
  2126. continue
  2127. }
  2128. }
  2129. executed++
  2130. if err = executeMetadataCheckForUser(&user); err != nil {
  2131. params.AddError(err)
  2132. failures = append(failures, user.Username)
  2133. }
  2134. }
  2135. if len(failures) > 0 {
  2136. return fmt.Errorf("metadata check failed for users: %s", strings.Join(failures, ", "))
  2137. }
  2138. if executed == 0 {
  2139. eventManagerLog(logger.LevelError, "no metadata check executed")
  2140. return errors.New("no metadata check executed")
  2141. }
  2142. return nil
  2143. }
  2144. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  2145. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2146. eventManagerLog(logger.LevelError, "skipping password expiration check for user %q, cannot apply group settings: %v",
  2147. user.Username, err)
  2148. return err
  2149. }
  2150. if user.ExpirationDate > 0 {
  2151. if expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate); expDate.Before(time.Now()) {
  2152. eventManagerLog(logger.LevelDebug, "skipping password expiration check for expired user %q, expiration date: %s",
  2153. user.Username, expDate)
  2154. return nil
  2155. }
  2156. }
  2157. if user.Filters.PasswordExpiration == 0 {
  2158. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  2159. return nil
  2160. }
  2161. days := user.PasswordExpiresIn()
  2162. if days > config.Threshold {
  2163. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  2164. user.Username, days, config.Threshold)
  2165. return nil
  2166. }
  2167. body := new(bytes.Buffer)
  2168. data := make(map[string]any)
  2169. data["Username"] = user.Username
  2170. data["Days"] = days
  2171. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  2172. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  2173. user.Username, err)
  2174. return err
  2175. }
  2176. subject := "SFTPGo password expiration notification"
  2177. startTime := time.Now()
  2178. if err := smtp.SendEmail([]string{user.Email}, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  2179. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  2180. user.Username, err, time.Since(startTime))
  2181. return err
  2182. }
  2183. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  2184. user.Username, days, time.Since(startTime))
  2185. return nil
  2186. }
  2187. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  2188. params *EventParams) error {
  2189. users, err := params.getUsers()
  2190. if err != nil {
  2191. return fmt.Errorf("unable to get users: %w", err)
  2192. }
  2193. var failures []string
  2194. for _, user := range users {
  2195. // if sender is set, the conditions have already been evaluated
  2196. if params.sender == "" {
  2197. if !checkUserConditionOptions(&user, &conditions) {
  2198. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  2199. user.Username)
  2200. continue
  2201. }
  2202. }
  2203. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  2204. params.AddError(err)
  2205. failures = append(failures, user.Username)
  2206. }
  2207. }
  2208. if len(failures) > 0 {
  2209. return fmt.Errorf("password expiration check failed for users: %s", strings.Join(failures, ", "))
  2210. }
  2211. return nil
  2212. }
  2213. func executeAdminCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.Admin, error) {
  2214. admin, err := dataprovider.AdminExists(params.Name)
  2215. exists := err == nil
  2216. if exists && c.Mode == 1 {
  2217. return &admin, nil
  2218. }
  2219. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2220. return nil, err
  2221. }
  2222. replacements := params.getStringReplacements(false, true)
  2223. replacer := strings.NewReplacer(replacements...)
  2224. data := replaceWithReplacer(c.TemplateAdmin, replacer)
  2225. var newAdmin dataprovider.Admin
  2226. err = json.Unmarshal([]byte(data), &newAdmin)
  2227. if err != nil {
  2228. return nil, err
  2229. }
  2230. if newAdmin.Password == "" {
  2231. newAdmin.Password = util.GenerateUniqueID()
  2232. }
  2233. if exists {
  2234. eventManagerLog(logger.LevelDebug, "updating admin %q after IDP login", params.Name)
  2235. err = dataprovider.UpdateAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2236. } else {
  2237. eventManagerLog(logger.LevelDebug, "creating admin %q after IDP login", params.Name)
  2238. err = dataprovider.AddAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2239. }
  2240. return &newAdmin, err
  2241. }
  2242. func executeUserCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.User, error) {
  2243. user, err := dataprovider.UserExists(params.Name, "")
  2244. exists := err == nil
  2245. if exists && c.Mode == 1 {
  2246. err = user.LoadAndApplyGroupSettings()
  2247. return &user, err
  2248. }
  2249. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2250. return nil, err
  2251. }
  2252. replacements := params.getStringReplacements(false, true)
  2253. replacer := strings.NewReplacer(replacements...)
  2254. data := replaceWithReplacer(c.TemplateUser, replacer)
  2255. var newUser dataprovider.User
  2256. err = json.Unmarshal([]byte(data), &newUser)
  2257. if err != nil {
  2258. return nil, err
  2259. }
  2260. if exists {
  2261. eventManagerLog(logger.LevelDebug, "updating user %q after IDP login", params.Name)
  2262. err = dataprovider.UpdateUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2263. } else {
  2264. eventManagerLog(logger.LevelDebug, "creating user %q after IDP login", params.Name)
  2265. err = dataprovider.AddUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2266. }
  2267. if err != nil {
  2268. return nil, err
  2269. }
  2270. u, err := dataprovider.GetUserWithGroupSettings(params.Name, "")
  2271. return &u, err
  2272. }
  2273. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams,
  2274. conditions dataprovider.ConditionOptions,
  2275. ) error {
  2276. var err error
  2277. switch action.Type {
  2278. case dataprovider.ActionTypeHTTP:
  2279. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  2280. case dataprovider.ActionTypeCommand:
  2281. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  2282. case dataprovider.ActionTypeEmail:
  2283. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  2284. case dataprovider.ActionTypeBackup:
  2285. var backupPath string
  2286. backupPath, err = dataprovider.ExecuteBackup()
  2287. if err == nil {
  2288. params.setBackupParams(backupPath)
  2289. }
  2290. case dataprovider.ActionTypeUserQuotaReset:
  2291. err = executeUsersQuotaResetRuleAction(conditions, params)
  2292. case dataprovider.ActionTypeFolderQuotaReset:
  2293. err = executeFoldersQuotaResetRuleAction(conditions, params)
  2294. case dataprovider.ActionTypeTransferQuotaReset:
  2295. err = executeTransferQuotaResetRuleAction(conditions, params)
  2296. case dataprovider.ActionTypeDataRetentionCheck:
  2297. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  2298. case dataprovider.ActionTypeMetadataCheck:
  2299. err = executeMetadataCheckRuleAction(conditions, params)
  2300. case dataprovider.ActionTypeFilesystem:
  2301. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  2302. case dataprovider.ActionTypePasswordExpirationCheck:
  2303. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  2304. case dataprovider.ActionTypeUserExpirationCheck:
  2305. err = executeUserExpirationCheckRuleAction(conditions, params)
  2306. default:
  2307. err = fmt.Errorf("unsupported action type: %d", action.Type)
  2308. }
  2309. if err != nil {
  2310. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  2311. }
  2312. params.AddError(err)
  2313. return err
  2314. }
  2315. func executeIDPAccountCheckRule(rule dataprovider.EventRule, params EventParams) (*dataprovider.User,
  2316. *dataprovider.Admin, error,
  2317. ) {
  2318. for _, action := range rule.Actions {
  2319. if action.Type == dataprovider.ActionTypeIDPAccountCheck {
  2320. startTime := time.Now()
  2321. var user *dataprovider.User
  2322. var admin *dataprovider.Admin
  2323. var err error
  2324. var failedActions []string
  2325. paramsCopy := params.getACopy()
  2326. switch params.Event {
  2327. case IDPLoginAdmin:
  2328. admin, err = executeAdminCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2329. case IDPLoginUser:
  2330. user, err = executeUserCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2331. default:
  2332. err = fmt.Errorf("unsupported IDP login event: %q", params.Event)
  2333. }
  2334. if err != nil {
  2335. paramsCopy.AddError(fmt.Errorf("unable to handle %q: %w", params.Event, err))
  2336. eventManagerLog(logger.LevelError, "unable to handle IDP login event %q, err: %v", params.Event, err)
  2337. failedActions = append(failedActions, action.Name)
  2338. } else {
  2339. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2340. action.Name, rule.Name, time.Since(startTime))
  2341. }
  2342. // execute async actions if any, including failure actions
  2343. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2344. return user, admin, err
  2345. }
  2346. }
  2347. eventManagerLog(logger.LevelError, "no action executed for IDP login event %q, event rule: %q", params.Event, rule.Name)
  2348. return nil, nil, errors.New("no action executed")
  2349. }
  2350. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  2351. var errRes error
  2352. for _, rule := range rules {
  2353. var failedActions []string
  2354. paramsCopy := params.getACopy()
  2355. for _, action := range rule.Actions {
  2356. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  2357. startTime := time.Now()
  2358. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  2359. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  2360. action.Name, rule.Name, time.Since(startTime), err)
  2361. failedActions = append(failedActions, action.Name)
  2362. // we return the last error, it is ok for now
  2363. errRes = err
  2364. if action.Options.StopOnFailure {
  2365. break
  2366. }
  2367. } else {
  2368. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  2369. action.Name, rule.Name, time.Since(startTime))
  2370. }
  2371. }
  2372. }
  2373. // execute async actions if any, including failure actions
  2374. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2375. }
  2376. return errRes
  2377. }
  2378. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  2379. eventManager.addAsyncTask()
  2380. defer eventManager.removeAsyncTask()
  2381. for _, rule := range rules {
  2382. executeRuleAsyncActions(rule, params.getACopy(), nil)
  2383. }
  2384. }
  2385. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  2386. for _, action := range rule.Actions {
  2387. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  2388. startTime := time.Now()
  2389. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2390. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  2391. action.Name, rule.Name, time.Since(startTime), err)
  2392. failedActions = append(failedActions, action.Name)
  2393. if action.Options.StopOnFailure {
  2394. break
  2395. }
  2396. } else {
  2397. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2398. action.Name, rule.Name, time.Since(startTime))
  2399. }
  2400. }
  2401. }
  2402. if len(failedActions) > 0 {
  2403. params.updateStatusFromError = false
  2404. // execute failure actions
  2405. for _, action := range rule.Actions {
  2406. if action.Options.IsFailureAction {
  2407. startTime := time.Now()
  2408. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2409. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  2410. action.Name, rule.Name, time.Since(startTime), err)
  2411. if action.Options.StopOnFailure {
  2412. break
  2413. }
  2414. } else {
  2415. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  2416. action.Name, rule.Name, time.Since(startTime))
  2417. }
  2418. }
  2419. }
  2420. }
  2421. }
  2422. type eventCronJob struct {
  2423. ruleName string
  2424. }
  2425. func (j *eventCronJob) getTask(rule *dataprovider.EventRule) (dataprovider.Task, error) {
  2426. if rule.GuardFromConcurrentExecution() {
  2427. task, err := dataprovider.GetTaskByName(rule.Name)
  2428. if err != nil {
  2429. if errors.Is(err, util.ErrNotFound) {
  2430. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  2431. task = dataprovider.Task{
  2432. Name: rule.Name,
  2433. UpdateAt: 0,
  2434. Version: 0,
  2435. }
  2436. err = dataprovider.AddTask(rule.Name)
  2437. if err != nil {
  2438. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  2439. return task, err
  2440. }
  2441. } else {
  2442. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  2443. }
  2444. }
  2445. return task, err
  2446. }
  2447. return dataprovider.Task{}, nil
  2448. }
  2449. func (j *eventCronJob) Run() {
  2450. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  2451. rule, err := dataprovider.EventRuleExists(j.ruleName)
  2452. if err != nil {
  2453. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  2454. return
  2455. }
  2456. if err := rule.CheckActionsConsistency(""); err != nil {
  2457. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  2458. return
  2459. }
  2460. task, err := j.getTask(&rule)
  2461. if err != nil {
  2462. return
  2463. }
  2464. if task.Name != "" {
  2465. updateInterval := 5 * time.Minute
  2466. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  2467. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  2468. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  2469. return
  2470. }
  2471. err = dataprovider.UpdateTask(rule.Name, task.Version)
  2472. if err != nil {
  2473. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  2474. rule.Name, err)
  2475. return
  2476. }
  2477. ticker := time.NewTicker(updateInterval)
  2478. done := make(chan bool)
  2479. defer func() {
  2480. done <- true
  2481. ticker.Stop()
  2482. }()
  2483. go func(taskName string) {
  2484. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2485. for {
  2486. select {
  2487. case <-done:
  2488. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2489. return
  2490. case <-ticker.C:
  2491. err := dataprovider.UpdateTaskTimestamp(taskName)
  2492. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2493. }
  2494. }
  2495. }(task.Name)
  2496. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2497. } else {
  2498. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2499. }
  2500. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2501. }
  2502. // RunOnDemandRule executes actions for a rule with on-demand trigger
  2503. func RunOnDemandRule(name string) error {
  2504. eventManagerLog(logger.LevelDebug, "executing on demand rule %q", name)
  2505. rule, err := dataprovider.EventRuleExists(name)
  2506. if err != nil {
  2507. eventManagerLog(logger.LevelDebug, "unable to load rule with name %q", name)
  2508. return util.NewRecordNotFoundError(fmt.Sprintf("rule %q does not exist", name))
  2509. }
  2510. if rule.Trigger != dataprovider.EventTriggerOnDemand {
  2511. eventManagerLog(logger.LevelDebug, "cannot run rule %q as on demand, trigger: %d", name, rule.Trigger)
  2512. return util.NewValidationError(fmt.Sprintf("rule %q is not defined as on-demand", name))
  2513. }
  2514. if rule.Status != 1 {
  2515. eventManagerLog(logger.LevelDebug, "on-demand rule %q is inactive", name)
  2516. return util.NewValidationError(fmt.Sprintf("rule %q is inactive", name))
  2517. }
  2518. if err := rule.CheckActionsConsistency(""); err != nil {
  2519. eventManagerLog(logger.LevelError, "on-demand rule %q has incompatible actions: %v", name, err)
  2520. return util.NewValidationError(fmt.Sprintf("rule %q has incosistent actions", name))
  2521. }
  2522. eventManagerLog(logger.LevelDebug, "on-demand rule %q started", name)
  2523. go executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2524. return nil
  2525. }
  2526. type zipWriterWrapper struct {
  2527. Name string
  2528. Entries map[string]bool
  2529. Writer *zip.Writer
  2530. }
  2531. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2532. logger.Log(level, "eventmanager", "", format, v...)
  2533. }