eventmanager.go 94 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "mime/multipart"
  25. "net/http"
  26. "net/textproto"
  27. "net/url"
  28. "os"
  29. "os/exec"
  30. "path"
  31. "path/filepath"
  32. "slices"
  33. "strconv"
  34. "strings"
  35. "sync"
  36. "sync/atomic"
  37. "time"
  38. "github.com/bmatcuk/doublestar/v4"
  39. "github.com/klauspost/compress/zip"
  40. "github.com/robfig/cron/v3"
  41. "github.com/rs/xid"
  42. "github.com/sftpgo/sdk"
  43. "github.com/wneessen/go-mail"
  44. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  45. "github.com/drakkan/sftpgo/v2/internal/logger"
  46. "github.com/drakkan/sftpgo/v2/internal/plugin"
  47. "github.com/drakkan/sftpgo/v2/internal/smtp"
  48. "github.com/drakkan/sftpgo/v2/internal/util"
  49. "github.com/drakkan/sftpgo/v2/internal/vfs"
  50. )
  51. const (
  52. ipBlockedEventName = "IP Blocked"
  53. maxAttachmentsSize = int64(10 * 1024 * 1024)
  54. objDataPlaceholder = "{{ObjectData}}"
  55. objDataPlaceholderString = "{{ObjectDataString}}"
  56. dateTimeMillisFormat = "2006-01-02T15:04:05.000"
  57. )
  58. // Supported IDP login events
  59. const (
  60. IDPLoginUser = "IDP login user"
  61. IDPLoginAdmin = "IDP login admin"
  62. )
  63. var (
  64. // eventManager handle the supported event rules actions
  65. eventManager eventRulesContainer
  66. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  67. )
  68. func init() {
  69. eventManager = eventRulesContainer{
  70. schedulesMapping: make(map[string][]cron.EntryID),
  71. // arbitrary maximum number of concurrent asynchronous tasks,
  72. // each task could execute multiple actions
  73. concurrencyGuard: make(chan struct{}, 200),
  74. }
  75. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  76. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  77. p := EventParams{
  78. Name: executor,
  79. ObjectName: objectName,
  80. Event: operation,
  81. Status: 1,
  82. ObjectType: objectType,
  83. IP: ip,
  84. Role: role,
  85. Timestamp: time.Now(),
  86. Object: object,
  87. }
  88. if u, ok := object.(*dataprovider.User); ok {
  89. p.Email = u.Email
  90. } else if a, ok := object.(*dataprovider.Admin); ok {
  91. p.Email = a.Email
  92. }
  93. eventManager.handleProviderEvent(p)
  94. })
  95. }
  96. // HandleCertificateEvent checks and executes action rules for certificate events
  97. func HandleCertificateEvent(params EventParams) {
  98. eventManager.handleCertificateEvent(params)
  99. }
  100. // HandleIDPLoginEvent executes actions defined for a successful login from an Identity Provider
  101. func HandleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User, *dataprovider.Admin, error) {
  102. return eventManager.handleIDPLoginEvent(params, customFields)
  103. }
  104. // eventRulesContainer stores event rules by trigger
  105. type eventRulesContainer struct {
  106. sync.RWMutex
  107. lastLoad atomic.Int64
  108. FsEvents []dataprovider.EventRule
  109. ProviderEvents []dataprovider.EventRule
  110. Schedules []dataprovider.EventRule
  111. IPBlockedEvents []dataprovider.EventRule
  112. CertificateEvents []dataprovider.EventRule
  113. IPDLoginEvents []dataprovider.EventRule
  114. schedulesMapping map[string][]cron.EntryID
  115. concurrencyGuard chan struct{}
  116. }
  117. func (r *eventRulesContainer) addAsyncTask() {
  118. activeHooks.Add(1)
  119. r.concurrencyGuard <- struct{}{}
  120. }
  121. func (r *eventRulesContainer) removeAsyncTask() {
  122. activeHooks.Add(-1)
  123. <-r.concurrencyGuard
  124. }
  125. func (r *eventRulesContainer) getLastLoadTime() int64 {
  126. return r.lastLoad.Load()
  127. }
  128. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  129. r.lastLoad.Store(modTime)
  130. }
  131. // RemoveRule deletes the rule with the specified name
  132. func (r *eventRulesContainer) RemoveRule(name string) {
  133. r.Lock()
  134. defer r.Unlock()
  135. r.removeRuleInternal(name)
  136. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  137. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  138. }
  139. func (r *eventRulesContainer) removeRuleInternal(name string) {
  140. for idx := range r.FsEvents {
  141. if r.FsEvents[idx].Name == name {
  142. lastIdx := len(r.FsEvents) - 1
  143. r.FsEvents[idx] = r.FsEvents[lastIdx]
  144. r.FsEvents = r.FsEvents[:lastIdx]
  145. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  146. return
  147. }
  148. }
  149. for idx := range r.ProviderEvents {
  150. if r.ProviderEvents[idx].Name == name {
  151. lastIdx := len(r.ProviderEvents) - 1
  152. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  153. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  154. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  155. return
  156. }
  157. }
  158. for idx := range r.IPBlockedEvents {
  159. if r.IPBlockedEvents[idx].Name == name {
  160. lastIdx := len(r.IPBlockedEvents) - 1
  161. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  162. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  163. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  164. return
  165. }
  166. }
  167. for idx := range r.CertificateEvents {
  168. if r.CertificateEvents[idx].Name == name {
  169. lastIdx := len(r.CertificateEvents) - 1
  170. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  171. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  172. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  173. return
  174. }
  175. }
  176. for idx := range r.IPDLoginEvents {
  177. if r.IPDLoginEvents[idx].Name == name {
  178. lastIdx := len(r.IPDLoginEvents) - 1
  179. r.IPDLoginEvents[idx] = r.IPDLoginEvents[lastIdx]
  180. r.IPDLoginEvents = r.IPDLoginEvents[:lastIdx]
  181. eventManagerLog(logger.LevelDebug, "removed rule %q from IDP login events", name)
  182. return
  183. }
  184. }
  185. for idx := range r.Schedules {
  186. if r.Schedules[idx].Name == name {
  187. if schedules, ok := r.schedulesMapping[name]; ok {
  188. for _, entryID := range schedules {
  189. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  190. eventScheduler.Remove(entryID)
  191. }
  192. delete(r.schedulesMapping, name)
  193. }
  194. lastIdx := len(r.Schedules) - 1
  195. r.Schedules[idx] = r.Schedules[lastIdx]
  196. r.Schedules = r.Schedules[:lastIdx]
  197. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  198. return
  199. }
  200. }
  201. }
  202. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  203. r.removeRuleInternal(rule.Name)
  204. if rule.DeletedAt > 0 {
  205. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  206. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  207. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  208. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  209. }
  210. return
  211. }
  212. if rule.Status != 1 || rule.Trigger == dataprovider.EventTriggerOnDemand {
  213. return
  214. }
  215. switch rule.Trigger {
  216. case dataprovider.EventTriggerFsEvent:
  217. r.FsEvents = append(r.FsEvents, rule)
  218. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  219. case dataprovider.EventTriggerProviderEvent:
  220. r.ProviderEvents = append(r.ProviderEvents, rule)
  221. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  222. case dataprovider.EventTriggerIPBlocked:
  223. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  224. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  225. case dataprovider.EventTriggerCertificate:
  226. r.CertificateEvents = append(r.CertificateEvents, rule)
  227. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  228. case dataprovider.EventTriggerIDPLogin:
  229. r.IPDLoginEvents = append(r.IPDLoginEvents, rule)
  230. eventManagerLog(logger.LevelDebug, "added rule %q to IDP login events", rule.Name)
  231. case dataprovider.EventTriggerSchedule:
  232. for _, schedule := range rule.Conditions.Schedules {
  233. cronSpec := schedule.GetCronSpec()
  234. job := &eventCronJob{
  235. ruleName: dataprovider.ConvertName(rule.Name),
  236. }
  237. entryID, err := eventScheduler.AddJob(cronSpec, job)
  238. if err != nil {
  239. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  240. return
  241. }
  242. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  243. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  244. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  245. }
  246. r.Schedules = append(r.Schedules, rule)
  247. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  248. default:
  249. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  250. }
  251. }
  252. func (r *eventRulesContainer) loadRules() {
  253. eventManagerLog(logger.LevelDebug, "loading updated rules")
  254. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  255. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  256. if err != nil {
  257. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  258. return
  259. }
  260. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  261. if len(rules) > 0 {
  262. r.Lock()
  263. defer r.Unlock()
  264. for _, rule := range rules {
  265. r.addUpdateRuleInternal(rule)
  266. }
  267. }
  268. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d, IDP login events: %d",
  269. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents), len(r.IPDLoginEvents))
  270. r.setLastLoadTime(modTime)
  271. }
  272. func (*eventRulesContainer) checkIPDLoginEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  273. switch conditions.IDPLoginEvent {
  274. case dataprovider.IDPLoginUser:
  275. if params.Event != IDPLoginUser {
  276. return false
  277. }
  278. case dataprovider.IDPLoginAdmin:
  279. if params.Event != IDPLoginAdmin {
  280. return false
  281. }
  282. }
  283. return checkEventConditionPatterns(params.Name, conditions.Options.Names)
  284. }
  285. func (*eventRulesContainer) checkProviderEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  286. if !util.Contains(conditions.ProviderEvents, params.Event) {
  287. return false
  288. }
  289. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  290. return false
  291. }
  292. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  293. return false
  294. }
  295. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  296. return false
  297. }
  298. return true
  299. }
  300. func (*eventRulesContainer) checkFsEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  301. if !util.Contains(conditions.FsEvents, params.Event) {
  302. return false
  303. }
  304. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  305. return false
  306. }
  307. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  308. return false
  309. }
  310. if !checkEventGroupConditionPatterns(params.Groups, conditions.Options.GroupNames) {
  311. return false
  312. }
  313. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  314. return false
  315. }
  316. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  317. return false
  318. }
  319. if params.Event == operationUpload || params.Event == operationDownload {
  320. if conditions.Options.MinFileSize > 0 {
  321. if params.FileSize < conditions.Options.MinFileSize {
  322. return false
  323. }
  324. }
  325. if conditions.Options.MaxFileSize > 0 {
  326. if params.FileSize > conditions.Options.MaxFileSize {
  327. return false
  328. }
  329. }
  330. }
  331. return true
  332. }
  333. // hasFsRules returns true if there are any rules for filesystem event triggers
  334. func (r *eventRulesContainer) hasFsRules() bool {
  335. r.RLock()
  336. defer r.RUnlock()
  337. return len(r.FsEvents) > 0
  338. }
  339. // handleFsEvent executes the rules actions defined for the specified event.
  340. // The boolean parameter indicates whether a sync action was executed
  341. func (r *eventRulesContainer) handleFsEvent(params EventParams) (bool, error) {
  342. if params.Protocol == protocolEventAction {
  343. return false, nil
  344. }
  345. r.RLock()
  346. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  347. for _, rule := range r.FsEvents {
  348. if r.checkFsEventMatch(&rule.Conditions, &params) {
  349. if err := rule.CheckActionsConsistency(""); err != nil {
  350. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  351. rule.Name, err, params.Event)
  352. continue
  353. }
  354. hasSyncActions := false
  355. for _, action := range rule.Actions {
  356. if action.Options.ExecuteSync {
  357. hasSyncActions = true
  358. break
  359. }
  360. }
  361. if hasSyncActions {
  362. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  363. } else {
  364. rulesAsync = append(rulesAsync, rule)
  365. }
  366. }
  367. }
  368. r.RUnlock()
  369. params.sender = params.Name
  370. params.addUID()
  371. if len(rulesAsync) > 0 {
  372. go executeAsyncRulesActions(rulesAsync, params)
  373. }
  374. if len(rulesWithSyncActions) > 0 {
  375. return true, executeSyncRulesActions(rulesWithSyncActions, params)
  376. }
  377. return false, nil
  378. }
  379. func (r *eventRulesContainer) handleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User,
  380. *dataprovider.Admin, error,
  381. ) {
  382. r.RLock()
  383. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  384. for _, rule := range r.IPDLoginEvents {
  385. if r.checkIPDLoginEventMatch(&rule.Conditions, &params) {
  386. if err := rule.CheckActionsConsistency(""); err != nil {
  387. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  388. rule.Name, err, params.Event)
  389. continue
  390. }
  391. hasSyncActions := false
  392. for _, action := range rule.Actions {
  393. if action.Options.ExecuteSync {
  394. hasSyncActions = true
  395. break
  396. }
  397. }
  398. if hasSyncActions {
  399. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  400. } else {
  401. rulesAsync = append(rulesAsync, rule)
  402. }
  403. }
  404. }
  405. r.RUnlock()
  406. if len(rulesAsync) == 0 && len(rulesWithSyncActions) == 0 {
  407. return nil, nil, nil
  408. }
  409. params.addIDPCustomFields(customFields)
  410. if len(rulesWithSyncActions) > 1 {
  411. var ruleNames []string
  412. for _, r := range rulesWithSyncActions {
  413. ruleNames = append(ruleNames, r.Name)
  414. }
  415. return nil, nil, fmt.Errorf("more than one account check action rules matches: %q", strings.Join(ruleNames, ","))
  416. }
  417. params.addUID()
  418. if len(rulesAsync) > 0 {
  419. go executeAsyncRulesActions(rulesAsync, params)
  420. }
  421. if len(rulesWithSyncActions) > 0 {
  422. return executeIDPAccountCheckRule(rulesWithSyncActions[0], params)
  423. }
  424. return nil, nil, nil
  425. }
  426. // username is populated for user objects
  427. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  428. r.RLock()
  429. defer r.RUnlock()
  430. var rules []dataprovider.EventRule
  431. for _, rule := range r.ProviderEvents {
  432. if r.checkProviderEventMatch(&rule.Conditions, &params) {
  433. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  434. rules = append(rules, rule)
  435. } else {
  436. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  437. rule.Name, err, params.Event, params.ObjectType)
  438. }
  439. }
  440. }
  441. if len(rules) > 0 {
  442. params.sender = params.ObjectName
  443. go executeAsyncRulesActions(rules, params)
  444. }
  445. }
  446. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  447. r.RLock()
  448. defer r.RUnlock()
  449. if len(r.IPBlockedEvents) == 0 {
  450. return
  451. }
  452. var rules []dataprovider.EventRule
  453. for _, rule := range r.IPBlockedEvents {
  454. if err := rule.CheckActionsConsistency(""); err == nil {
  455. rules = append(rules, rule)
  456. } else {
  457. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  458. rule.Name, err, params.Event)
  459. }
  460. }
  461. if len(rules) > 0 {
  462. go executeAsyncRulesActions(rules, params)
  463. }
  464. }
  465. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  466. r.RLock()
  467. defer r.RUnlock()
  468. if len(r.CertificateEvents) == 0 {
  469. return
  470. }
  471. var rules []dataprovider.EventRule
  472. for _, rule := range r.CertificateEvents {
  473. if err := rule.CheckActionsConsistency(""); err == nil {
  474. rules = append(rules, rule)
  475. } else {
  476. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  477. rule.Name, err, params.Event)
  478. }
  479. }
  480. if len(rules) > 0 {
  481. go executeAsyncRulesActions(rules, params)
  482. }
  483. }
  484. type executedRetentionCheck struct {
  485. Username string
  486. ActionName string
  487. Results []folderRetentionCheckResult
  488. }
  489. // EventParams defines the supported event parameters
  490. type EventParams struct {
  491. Name string
  492. Groups []sdk.GroupMapping
  493. Event string
  494. Status int
  495. VirtualPath string
  496. FsPath string
  497. VirtualTargetPath string
  498. FsTargetPath string
  499. ObjectName string
  500. Extension string
  501. ObjectType string
  502. FileSize int64
  503. Elapsed int64
  504. Protocol string
  505. IP string
  506. Role string
  507. Email string
  508. Timestamp time.Time
  509. UID string
  510. IDPCustomFields *map[string]string
  511. Object plugin.Renderer
  512. Metadata map[string]string
  513. sender string
  514. updateStatusFromError bool
  515. errors []string
  516. retentionChecks []executedRetentionCheck
  517. }
  518. func (p *EventParams) getACopy() *EventParams {
  519. params := *p
  520. params.errors = make([]string, len(p.errors))
  521. copy(params.errors, p.errors)
  522. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  523. for _, c := range p.retentionChecks {
  524. executedCheck := executedRetentionCheck{
  525. Username: c.Username,
  526. ActionName: c.ActionName,
  527. }
  528. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  529. copy(executedCheck.Results, c.Results)
  530. retentionChecks = append(retentionChecks, executedCheck)
  531. }
  532. params.retentionChecks = retentionChecks
  533. if p.IDPCustomFields != nil {
  534. fields := make(map[string]string)
  535. for k, v := range *p.IDPCustomFields {
  536. fields[k] = v
  537. }
  538. params.IDPCustomFields = &fields
  539. }
  540. if len(params.Metadata) > 0 {
  541. metadata := make(map[string]string)
  542. for k, v := range p.Metadata {
  543. metadata[k] = v
  544. }
  545. params.Metadata = metadata
  546. }
  547. return &params
  548. }
  549. func (p *EventParams) addIDPCustomFields(customFields *map[string]any) {
  550. if customFields == nil || len(*customFields) == 0 {
  551. return
  552. }
  553. fields := make(map[string]string)
  554. for k, v := range *customFields {
  555. switch val := v.(type) {
  556. case string:
  557. fields[k] = val
  558. }
  559. }
  560. p.IDPCustomFields = &fields
  561. }
  562. // AddError adds a new error to the event params and update the status if needed
  563. func (p *EventParams) AddError(err error) {
  564. if err == nil {
  565. return
  566. }
  567. if p.updateStatusFromError && p.Status == 1 {
  568. p.Status = 2
  569. }
  570. p.errors = append(p.errors, err.Error())
  571. }
  572. func (p *EventParams) addUID() {
  573. if p.UID == "" {
  574. p.UID = util.GenerateUniqueID()
  575. }
  576. }
  577. func (p *EventParams) setBackupParams(backupPath string) {
  578. if p.sender != "" {
  579. return
  580. }
  581. p.sender = dataprovider.ActionExecutorSystem
  582. p.FsPath = backupPath
  583. p.ObjectName = filepath.Base(backupPath)
  584. p.VirtualPath = "/" + p.ObjectName
  585. p.Timestamp = time.Now()
  586. info, err := os.Stat(backupPath)
  587. if err == nil {
  588. p.FileSize = info.Size()
  589. }
  590. }
  591. func (p *EventParams) getStatusString() string {
  592. switch p.Status {
  593. case 1:
  594. return "OK"
  595. default:
  596. return "KO"
  597. }
  598. }
  599. // getUsers returns users with group settings not applied
  600. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  601. if p.sender == "" {
  602. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeUsers})
  603. if err != nil {
  604. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  605. return nil, errors.New("unable to get users")
  606. }
  607. return dump.Users, nil
  608. }
  609. user, err := p.getUserFromSender()
  610. if err != nil {
  611. return nil, err
  612. }
  613. return []dataprovider.User{user}, nil
  614. }
  615. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  616. if p.sender == dataprovider.ActionExecutorSystem {
  617. return dataprovider.User{
  618. BaseUser: sdk.BaseUser{
  619. Status: 1,
  620. Username: p.sender,
  621. HomeDir: dataprovider.GetBackupsPath(),
  622. Permissions: map[string][]string{
  623. "/": {dataprovider.PermAny},
  624. },
  625. },
  626. }, nil
  627. }
  628. user, err := dataprovider.UserExists(p.sender, "")
  629. if err != nil {
  630. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  631. return user, fmt.Errorf("error getting user %q", p.sender)
  632. }
  633. return user, nil
  634. }
  635. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  636. if p.sender == "" {
  637. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeFolders})
  638. return dump.Folders, err
  639. }
  640. folder, err := dataprovider.GetFolderByName(p.sender)
  641. if err != nil {
  642. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  643. }
  644. return []vfs.BaseVirtualFolder{folder}, nil
  645. }
  646. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  647. if len(p.retentionChecks) == 0 {
  648. return nil, errors.New("no data retention report available")
  649. }
  650. var b bytes.Buffer
  651. if _, err := p.writeCompressedDataRetentionReports(&b); err != nil {
  652. return nil, err
  653. }
  654. return b.Bytes(), nil
  655. }
  656. func (p *EventParams) writeCompressedDataRetentionReports(w io.Writer) (int64, error) {
  657. var n int64
  658. wr := zip.NewWriter(w)
  659. for _, check := range p.retentionChecks {
  660. data, err := getCSVRetentionReport(check.Results)
  661. if err != nil {
  662. return n, fmt.Errorf("unable to get CSV report: %w", err)
  663. }
  664. dataSize := int64(len(data))
  665. n += dataSize
  666. // we suppose a 3:1 compression ratio
  667. if n > (maxAttachmentsSize * 3) {
  668. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s",
  669. util.ByteCountIEC(n))
  670. return n, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(n))
  671. }
  672. fh := &zip.FileHeader{
  673. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  674. Method: zip.Deflate,
  675. Modified: time.Now().UTC(),
  676. }
  677. f, err := wr.CreateHeader(fh)
  678. if err != nil {
  679. return n, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  680. }
  681. _, err = io.CopyN(f, bytes.NewBuffer(data), dataSize)
  682. if err != nil {
  683. return n, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  684. }
  685. }
  686. if err := wr.Close(); err != nil {
  687. return n, fmt.Errorf("unable to close zip writer: %w", err)
  688. }
  689. return n, nil
  690. }
  691. func (p *EventParams) getRetentionReportsAsMailAttachment() (*mail.File, error) {
  692. if len(p.retentionChecks) == 0 {
  693. return nil, errors.New("no data retention report available")
  694. }
  695. return &mail.File{
  696. Name: "retention-reports.zip",
  697. Header: make(map[string][]string),
  698. Writer: p.writeCompressedDataRetentionReports,
  699. }, nil
  700. }
  701. func (*EventParams) getStringReplacement(val string, jsonEscaped bool) string {
  702. if jsonEscaped {
  703. return util.JSONEscape(val)
  704. }
  705. return val
  706. }
  707. func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []string {
  708. replacements := []string{
  709. "{{Name}}", p.getStringReplacement(p.Name, jsonEscaped),
  710. "{{Event}}", p.Event,
  711. "{{Status}}", fmt.Sprintf("%d", p.Status),
  712. "{{VirtualPath}}", p.getStringReplacement(p.VirtualPath, jsonEscaped),
  713. "{{FsPath}}", p.getStringReplacement(p.FsPath, jsonEscaped),
  714. "{{VirtualTargetPath}}", p.getStringReplacement(p.VirtualTargetPath, jsonEscaped),
  715. "{{FsTargetPath}}", p.getStringReplacement(p.FsTargetPath, jsonEscaped),
  716. "{{ObjectName}}", p.getStringReplacement(p.ObjectName, jsonEscaped),
  717. "{{ObjectType}}", p.ObjectType,
  718. "{{FileSize}}", strconv.FormatInt(p.FileSize, 10),
  719. "{{Elapsed}}", strconv.FormatInt(p.Elapsed, 10),
  720. "{{Protocol}}", p.Protocol,
  721. "{{IP}}", p.IP,
  722. "{{Role}}", p.getStringReplacement(p.Role, jsonEscaped),
  723. "{{Email}}", p.getStringReplacement(p.Email, jsonEscaped),
  724. "{{Timestamp}}", strconv.FormatInt(p.Timestamp.UnixNano(), 10),
  725. "{{DateTime}}", p.Timestamp.UTC().Format(dateTimeMillisFormat),
  726. "{{StatusString}}", p.getStatusString(),
  727. "{{UID}}", p.getStringReplacement(p.UID, jsonEscaped),
  728. "{{Ext}}", p.getStringReplacement(p.Extension, jsonEscaped),
  729. }
  730. if p.VirtualPath != "" {
  731. replacements = append(replacements, "{{VirtualDirPath}}", p.getStringReplacement(path.Dir(p.VirtualPath), jsonEscaped))
  732. }
  733. if p.VirtualTargetPath != "" {
  734. replacements = append(replacements, "{{VirtualTargetDirPath}}", p.getStringReplacement(path.Dir(p.VirtualTargetPath), jsonEscaped))
  735. replacements = append(replacements, "{{TargetName}}", p.getStringReplacement(path.Base(p.VirtualTargetPath), jsonEscaped))
  736. }
  737. if len(p.errors) > 0 {
  738. replacements = append(replacements, "{{ErrorString}}", p.getStringReplacement(strings.Join(p.errors, ", "), jsonEscaped))
  739. } else {
  740. replacements = append(replacements, "{{ErrorString}}", "")
  741. }
  742. replacements = append(replacements, objDataPlaceholder, "{}")
  743. replacements = append(replacements, objDataPlaceholderString, "")
  744. if addObjectData {
  745. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  746. if err == nil {
  747. dataString := util.BytesToString(data)
  748. replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
  749. replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
  750. }
  751. }
  752. if p.IDPCustomFields != nil {
  753. for k, v := range *p.IDPCustomFields {
  754. replacements = append(replacements, fmt.Sprintf("{{IDPField%s}}", k), p.getStringReplacement(v, jsonEscaped))
  755. }
  756. }
  757. replacements = append(replacements, "{{Metadata}}", "{}")
  758. replacements = append(replacements, "{{MetadataString}}", "")
  759. if len(p.Metadata) > 0 {
  760. data, err := json.Marshal(p.Metadata)
  761. if err == nil {
  762. dataString := util.BytesToString(data)
  763. replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
  764. replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
  765. }
  766. }
  767. return replacements
  768. }
  769. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  770. var b bytes.Buffer
  771. csvWriter := csv.NewWriter(&b)
  772. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  773. "elapsed (ms)", "info", "error"})
  774. if err != nil {
  775. return nil, err
  776. }
  777. for _, result := range results {
  778. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  779. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  780. result.Info, result.Error})
  781. if err != nil {
  782. return nil, err
  783. }
  784. }
  785. csvWriter.Flush()
  786. err = csvWriter.Error()
  787. return b.Bytes(), err
  788. }
  789. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSourcePath, virtualTargetPath string,
  790. numFiles int, truncatedSize int64, errTransfer error, operation string, startTime time.Time,
  791. ) error {
  792. var fsDstPath string
  793. var errDstFs error
  794. errWrite := w.Close()
  795. targetPath := virtualSourcePath
  796. if virtualTargetPath != "" {
  797. targetPath = virtualTargetPath
  798. var fsDst vfs.Fs
  799. fsDst, fsDstPath, errDstFs = conn.GetFsAndResolvedPath(virtualTargetPath)
  800. if errTransfer != nil && errDstFs == nil {
  801. // try to remove a partial file on error. If this fails, we can't do anything
  802. errRemove := fsDst.Remove(fsDstPath, false)
  803. conn.Log(logger.LevelDebug, "removing partial file %q after write error, result: %v", virtualTargetPath, errRemove)
  804. }
  805. }
  806. info, err := conn.doStatInternal(targetPath, 0, false, false)
  807. if err == nil {
  808. updateUserQuotaAfterFileWrite(conn, targetPath, numFiles, info.Size()-truncatedSize)
  809. var fsSrcPath string
  810. var errSrcFs error
  811. if virtualSourcePath != "" {
  812. _, fsSrcPath, errSrcFs = conn.GetFsAndResolvedPath(virtualSourcePath)
  813. }
  814. if errSrcFs == nil && errDstFs == nil {
  815. elapsed := time.Since(startTime).Nanoseconds() / 1000000
  816. if errTransfer == nil {
  817. errTransfer = errWrite
  818. }
  819. if operation == operationCopy {
  820. logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
  821. "", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed)
  822. }
  823. ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed, nil) //nolint:errcheck
  824. }
  825. } else {
  826. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
  827. }
  828. if errTransfer != nil {
  829. return errTransfer
  830. }
  831. return errWrite
  832. }
  833. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  834. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  835. if err != nil {
  836. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  837. return
  838. }
  839. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  840. if vfolder.IsIncludedInUserQuota() {
  841. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  842. }
  843. }
  844. func checkWriterPermsAndQuota(conn *BaseConnection, virtualPath string, numFiles int, expectedSize, truncatedSize int64) error {
  845. if numFiles == 0 {
  846. if !conn.User.HasPerm(dataprovider.PermOverwrite, path.Dir(virtualPath)) {
  847. return conn.GetPermissionDeniedError()
  848. }
  849. } else {
  850. if !conn.User.HasPerm(dataprovider.PermUpload, path.Dir(virtualPath)) {
  851. return conn.GetPermissionDeniedError()
  852. }
  853. }
  854. q, _ := conn.HasSpace(numFiles > 0, false, virtualPath)
  855. if !q.HasSpace {
  856. return conn.GetQuotaExceededError()
  857. }
  858. if expectedSize != -1 {
  859. sizeDiff := expectedSize - truncatedSize
  860. if sizeDiff > 0 {
  861. remainingSize := q.GetRemainingSize()
  862. if remainingSize > 0 && remainingSize < sizeDiff {
  863. return conn.GetQuotaExceededError()
  864. }
  865. }
  866. }
  867. return nil
  868. }
  869. func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64) (io.WriteCloser, int, int64, func(), error) {
  870. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  871. if err != nil {
  872. return nil, 0, 0, nil, err
  873. }
  874. var truncatedSize, fileSize int64
  875. numFiles := 1
  876. isFileOverwrite := false
  877. info, err := fs.Lstat(fsPath)
  878. if err == nil {
  879. fileSize = info.Size()
  880. if info.IsDir() {
  881. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  882. }
  883. if info.Mode().IsRegular() {
  884. isFileOverwrite = true
  885. truncatedSize = fileSize
  886. }
  887. numFiles = 0
  888. }
  889. if err != nil && !fs.IsNotExist(err) {
  890. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  891. }
  892. if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
  893. return nil, numFiles, truncatedSize, nil, err
  894. }
  895. f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1, false))
  896. if err != nil {
  897. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  898. }
  899. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  900. if isFileOverwrite {
  901. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  902. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  903. truncatedSize = 0
  904. }
  905. }
  906. if cancelFn == nil {
  907. cancelFn = func() {}
  908. }
  909. if f != nil {
  910. return f, numFiles, truncatedSize, cancelFn, nil
  911. }
  912. return w, numFiles, truncatedSize, cancelFn, nil
  913. }
  914. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string, recursion int) error {
  915. if entryPath == wr.Name {
  916. // skip the archive itself
  917. return nil
  918. }
  919. if recursion >= util.MaxRecursion {
  920. eventManagerLog(logger.LevelError, "unable to add zip entry %q, recursion too deep: %v", entryPath, recursion)
  921. return util.ErrRecursionTooDeep
  922. }
  923. recursion++
  924. info, err := conn.DoStat(entryPath, 1, false)
  925. if err != nil {
  926. eventManagerLog(logger.LevelError, "unable to add zip entry %q, stat error: %v", entryPath, err)
  927. return err
  928. }
  929. entryName, err := getZipEntryName(entryPath, baseDir)
  930. if err != nil {
  931. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  932. return err
  933. }
  934. if _, ok := wr.Entries[entryName]; ok {
  935. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  936. return nil
  937. }
  938. wr.Entries[entryName] = true
  939. if info.IsDir() {
  940. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  941. Name: entryName + "/",
  942. Method: zip.Deflate,
  943. Modified: info.ModTime(),
  944. })
  945. if err != nil {
  946. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  947. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  948. }
  949. lister, err := conn.ListDir(entryPath)
  950. if err != nil {
  951. eventManagerLog(logger.LevelError, "unable to add zip entry %q, get dir lister error: %v", entryPath, err)
  952. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  953. }
  954. defer lister.Close()
  955. for {
  956. contents, err := lister.Next(vfs.ListerBatchSize)
  957. finished := errors.Is(err, io.EOF)
  958. if err := lister.convertError(err); err != nil {
  959. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  960. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  961. }
  962. for _, info := range contents {
  963. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  964. if err := addZipEntry(wr, conn, fullPath, baseDir, recursion); err != nil {
  965. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  966. return err
  967. }
  968. }
  969. if finished {
  970. return nil
  971. }
  972. }
  973. }
  974. if !info.Mode().IsRegular() {
  975. // we only allow regular files
  976. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  977. return nil
  978. }
  979. return addFileToZip(wr, conn, entryPath, entryName, info.ModTime())
  980. }
  981. func addFileToZip(wr *zipWriterWrapper, conn *BaseConnection, entryPath, entryName string, modTime time.Time) error {
  982. reader, cancelFn, err := getFileReader(conn, entryPath)
  983. if err != nil {
  984. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  985. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  986. }
  987. defer cancelFn()
  988. defer reader.Close()
  989. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  990. Name: entryName,
  991. Method: zip.Deflate,
  992. Modified: modTime,
  993. })
  994. if err != nil {
  995. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  996. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  997. }
  998. _, err = io.Copy(f, reader)
  999. return err
  1000. }
  1001. func getZipEntryName(entryPath, baseDir string) (string, error) {
  1002. if !strings.HasPrefix(entryPath, baseDir) {
  1003. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  1004. }
  1005. entryPath = strings.TrimPrefix(entryPath, baseDir)
  1006. return strings.TrimPrefix(entryPath, "/"), nil
  1007. }
  1008. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  1009. if !conn.User.HasPerm(dataprovider.PermDownload, path.Dir(virtualPath)) {
  1010. return nil, nil, conn.GetPermissionDeniedError()
  1011. }
  1012. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  1013. if err != nil {
  1014. return nil, nil, err
  1015. }
  1016. f, r, cancelFn, err := fs.Open(fsPath, 0)
  1017. if err != nil {
  1018. return nil, nil, conn.GetFsError(fs, err)
  1019. }
  1020. if cancelFn == nil {
  1021. cancelFn = func() {}
  1022. }
  1023. if f != nil {
  1024. return f, cancelFn, nil
  1025. }
  1026. return r, cancelFn, nil
  1027. }
  1028. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  1029. reader, cancelFn, err := getFileReader(conn, virtualPath)
  1030. if err != nil {
  1031. return err
  1032. }
  1033. defer cancelFn()
  1034. defer reader.Close()
  1035. _, err = io.Copy(w, reader)
  1036. return err
  1037. }
  1038. func getFileContentFn(conn *BaseConnection, virtualPath string, size int64) func(w io.Writer) (int64, error) {
  1039. return func(w io.Writer) (int64, error) {
  1040. reader, cancelFn, err := getFileReader(conn, virtualPath)
  1041. if err != nil {
  1042. return 0, err
  1043. }
  1044. defer cancelFn()
  1045. defer reader.Close()
  1046. return io.CopyN(w, reader, size)
  1047. }
  1048. }
  1049. func getMailAttachments(conn *BaseConnection, attachments []string, replacer *strings.Replacer) ([]*mail.File, error) {
  1050. var files []*mail.File
  1051. totalSize := int64(0)
  1052. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  1053. info, err := conn.DoStat(virtualPath, 0, false)
  1054. if err != nil {
  1055. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  1056. }
  1057. if !info.Mode().IsRegular() {
  1058. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  1059. }
  1060. totalSize += info.Size()
  1061. if totalSize > maxAttachmentsSize {
  1062. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  1063. }
  1064. files = append(files, &mail.File{
  1065. Name: path.Base(virtualPath),
  1066. Header: make(map[string][]string),
  1067. Writer: getFileContentFn(conn, virtualPath, info.Size()),
  1068. })
  1069. }
  1070. return files, nil
  1071. }
  1072. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  1073. if !strings.Contains(input, "{{") {
  1074. return input
  1075. }
  1076. return replacer.Replace(input)
  1077. }
  1078. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  1079. var matched bool
  1080. var err error
  1081. if strings.Contains(p.Pattern, "**") {
  1082. matched, err = doublestar.Match(p.Pattern, name)
  1083. } else {
  1084. matched, err = path.Match(p.Pattern, name)
  1085. }
  1086. if err != nil {
  1087. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  1088. return false
  1089. }
  1090. if p.InverseMatch {
  1091. return !matched
  1092. }
  1093. return matched
  1094. }
  1095. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  1096. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1097. return false
  1098. }
  1099. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  1100. return false
  1101. }
  1102. if !checkEventGroupConditionPatterns(user.Groups, conditions.GroupNames) {
  1103. return false
  1104. }
  1105. return true
  1106. }
  1107. // checkEventConditionPatterns returns false if patterns are defined and no match is found
  1108. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  1109. if len(patterns) == 0 {
  1110. return true
  1111. }
  1112. matches := false
  1113. for _, p := range patterns {
  1114. // assume, that multiple InverseMatches are set
  1115. if p.InverseMatch {
  1116. if checkEventConditionPattern(p, name) {
  1117. matches = true
  1118. } else {
  1119. return false
  1120. }
  1121. } else if checkEventConditionPattern(p, name) {
  1122. return true
  1123. }
  1124. }
  1125. return matches
  1126. }
  1127. func checkEventGroupConditionPatterns(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  1128. if len(patterns) == 0 {
  1129. return true
  1130. }
  1131. matches := false
  1132. for _, group := range groups {
  1133. for _, p := range patterns {
  1134. // assume, that multiple InverseMatches are set
  1135. if p.InverseMatch {
  1136. if checkEventConditionPattern(p, group.Name) {
  1137. matches = true
  1138. } else {
  1139. return false
  1140. }
  1141. } else {
  1142. if checkEventConditionPattern(p, group.Name) {
  1143. return true
  1144. }
  1145. }
  1146. }
  1147. }
  1148. return matches
  1149. }
  1150. func getHTTPRuleActionEndpoint(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  1151. u, err := url.Parse(c.Endpoint)
  1152. if err != nil {
  1153. return "", fmt.Errorf("invalid endpoint: %w", err)
  1154. }
  1155. if strings.Contains(u.Path, "{{") {
  1156. pathComponents := strings.Split(u.Path, "/")
  1157. for idx := range pathComponents {
  1158. part := replaceWithReplacer(pathComponents[idx], replacer)
  1159. if part != pathComponents[idx] {
  1160. pathComponents[idx] = url.PathEscape(part)
  1161. }
  1162. }
  1163. u.Path = ""
  1164. u = u.JoinPath(pathComponents...)
  1165. }
  1166. if len(c.QueryParameters) > 0 {
  1167. q := u.Query()
  1168. for _, keyVal := range c.QueryParameters {
  1169. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1170. }
  1171. u.RawQuery = q.Encode()
  1172. }
  1173. return u.String(), nil
  1174. }
  1175. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  1176. conn *BaseConnection, replacer *strings.Replacer, params *EventParams, addObjectData bool,
  1177. ) error {
  1178. partWriter, err := m.CreatePart(h)
  1179. if err != nil {
  1180. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  1181. return err
  1182. }
  1183. if part.Body != "" {
  1184. cType := h.Get("Content-Type")
  1185. if strings.Contains(strings.ToLower(cType), "application/json") {
  1186. replacements := params.getStringReplacements(addObjectData, true)
  1187. jsonReplacer := strings.NewReplacer(replacements...)
  1188. _, err = partWriter.Write(util.StringToBytes(replaceWithReplacer(part.Body, jsonReplacer)))
  1189. } else {
  1190. _, err = partWriter.Write(util.StringToBytes(replaceWithReplacer(part.Body, replacer)))
  1191. }
  1192. if err != nil {
  1193. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1194. return err
  1195. }
  1196. return nil
  1197. }
  1198. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  1199. data, err := params.getCompressedDataRetentionReport()
  1200. if err != nil {
  1201. return err
  1202. }
  1203. _, err = partWriter.Write(data)
  1204. if err != nil {
  1205. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1206. return err
  1207. }
  1208. return nil
  1209. }
  1210. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  1211. if err != nil {
  1212. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  1213. return err
  1214. }
  1215. return nil
  1216. }
  1217. func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1218. cancel context.CancelFunc, user dataprovider.User, params *EventParams, addObjectData bool,
  1219. ) (io.Reader, string, error) {
  1220. var body io.Reader
  1221. if c.Method == http.MethodGet {
  1222. return body, "", nil
  1223. }
  1224. if c.Body != "" {
  1225. if c.Body == dataprovider.RetentionReportPlaceHolder {
  1226. data, err := params.getCompressedDataRetentionReport()
  1227. if err != nil {
  1228. return body, "", err
  1229. }
  1230. return bytes.NewBuffer(data), "", nil
  1231. }
  1232. if c.HasJSONBody() {
  1233. replacements := params.getStringReplacements(addObjectData, true)
  1234. jsonReplacer := strings.NewReplacer(replacements...)
  1235. return bytes.NewBufferString(replaceWithReplacer(c.Body, jsonReplacer)), "", nil
  1236. }
  1237. return bytes.NewBufferString(replaceWithReplacer(c.Body, replacer)), "", nil
  1238. }
  1239. if len(c.Parts) > 0 {
  1240. r, w := io.Pipe()
  1241. m := multipart.NewWriter(w)
  1242. var conn *BaseConnection
  1243. if user.Username != "" {
  1244. var err error
  1245. user, err = getUserForEventAction(user)
  1246. if err != nil {
  1247. return body, "", err
  1248. }
  1249. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1250. err = user.CheckFsRoot(connectionID)
  1251. if err != nil {
  1252. user.CloseFs() //nolint:errcheck
  1253. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  1254. user.Username, err)
  1255. }
  1256. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1257. }
  1258. go func() {
  1259. defer w.Close()
  1260. defer user.CloseFs() //nolint:errcheck
  1261. for _, part := range c.Parts {
  1262. h := make(textproto.MIMEHeader)
  1263. if part.Body != "" {
  1264. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  1265. } else {
  1266. h.Set("Content-Disposition",
  1267. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  1268. multipartQuoteEscaper.Replace(part.Name),
  1269. multipartQuoteEscaper.Replace((path.Base(replaceWithReplacer(part.Filepath, replacer))))))
  1270. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  1271. if contentType == "" {
  1272. contentType = "application/octet-stream"
  1273. }
  1274. h.Set("Content-Type", contentType)
  1275. }
  1276. for _, keyVal := range part.Headers {
  1277. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1278. }
  1279. if err := writeHTTPPart(m, part, h, conn, replacer, params, addObjectData); err != nil {
  1280. cancel()
  1281. return
  1282. }
  1283. }
  1284. m.Close()
  1285. }()
  1286. return r, m.FormDataContentType(), nil
  1287. }
  1288. return body, "", nil
  1289. }
  1290. func setHTTPReqHeaders(req *http.Request, c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1291. contentType string,
  1292. ) {
  1293. if contentType != "" {
  1294. req.Header.Set("Content-Type", contentType)
  1295. }
  1296. if c.Username != "" || c.Password.GetPayload() != "" {
  1297. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1298. }
  1299. for _, keyVal := range c.Headers {
  1300. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1301. }
  1302. }
  1303. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1304. if err := c.TryDecryptPassword(); err != nil {
  1305. return err
  1306. }
  1307. addObjectData := false
  1308. if params.Object != nil {
  1309. addObjectData = c.HasObjectData()
  1310. }
  1311. replacements := params.getStringReplacements(addObjectData, false)
  1312. replacer := strings.NewReplacer(replacements...)
  1313. endpoint, err := getHTTPRuleActionEndpoint(&c, replacer)
  1314. if err != nil {
  1315. return err
  1316. }
  1317. ctx, cancel := c.GetContext()
  1318. defer cancel()
  1319. var user dataprovider.User
  1320. if c.HasMultipartFiles() {
  1321. user, err = params.getUserFromSender()
  1322. if err != nil {
  1323. return err
  1324. }
  1325. }
  1326. body, contentType, err := getHTTPRuleActionBody(&c, replacer, cancel, user, params, addObjectData)
  1327. if err != nil {
  1328. return err
  1329. }
  1330. if body != nil {
  1331. rc, ok := body.(io.ReadCloser)
  1332. if ok {
  1333. defer rc.Close()
  1334. }
  1335. }
  1336. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1337. if err != nil {
  1338. return err
  1339. }
  1340. setHTTPReqHeaders(req, &c, replacer, contentType)
  1341. client := c.GetHTTPClient()
  1342. defer client.CloseIdleConnections()
  1343. startTime := time.Now()
  1344. resp, err := client.Do(req)
  1345. if err != nil {
  1346. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1347. endpoint, time.Since(startTime), err)
  1348. return fmt.Errorf("error sending HTTP request: %w", err)
  1349. }
  1350. defer resp.Body.Close()
  1351. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1352. endpoint, time.Since(startTime), resp.StatusCode)
  1353. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1354. if rb, err := io.ReadAll(io.LimitReader(resp.Body, 2048)); err == nil {
  1355. eventManagerLog(logger.LevelDebug, "error notification response from endpoint %q: %s",
  1356. endpoint, util.BytesToString(rb))
  1357. }
  1358. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1359. }
  1360. return nil
  1361. }
  1362. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1363. addObjectData := false
  1364. if params.Object != nil {
  1365. for _, k := range c.EnvVars {
  1366. if strings.Contains(k.Value, objDataPlaceholder) || strings.Contains(k.Value, objDataPlaceholderString) {
  1367. addObjectData = true
  1368. break
  1369. }
  1370. }
  1371. }
  1372. replacements := params.getStringReplacements(addObjectData, false)
  1373. replacer := strings.NewReplacer(replacements...)
  1374. args := make([]string, 0, len(c.Args))
  1375. for _, arg := range c.Args {
  1376. args = append(args, replaceWithReplacer(arg, replacer))
  1377. }
  1378. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1379. defer cancel()
  1380. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1381. cmd.Env = []string{}
  1382. for _, keyVal := range c.EnvVars {
  1383. if keyVal.Value == "$" {
  1384. val := os.Getenv(keyVal.Key)
  1385. if val == "" {
  1386. eventManagerLog(logger.LevelDebug, "empty value for environment variable %q", keyVal.Key)
  1387. }
  1388. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, val))
  1389. } else {
  1390. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1391. }
  1392. }
  1393. startTime := time.Now()
  1394. err := cmd.Run()
  1395. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1396. c.Cmd, time.Since(startTime), err)
  1397. return err
  1398. }
  1399. func getEmailAddressesWithReplacer(addrs []string, replacer *strings.Replacer) []string {
  1400. if len(addrs) == 0 {
  1401. return nil
  1402. }
  1403. recipients := make([]string, 0, len(addrs))
  1404. for _, recipient := range addrs {
  1405. rcpt := replaceWithReplacer(recipient, replacer)
  1406. if rcpt != "" {
  1407. recipients = append(recipients, rcpt)
  1408. }
  1409. }
  1410. return recipients
  1411. }
  1412. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1413. addObjectData := false
  1414. if params.Object != nil {
  1415. if strings.Contains(c.Body, objDataPlaceholder) || strings.Contains(c.Body, objDataPlaceholderString) {
  1416. addObjectData = true
  1417. }
  1418. }
  1419. replacements := params.getStringReplacements(addObjectData, false)
  1420. replacer := strings.NewReplacer(replacements...)
  1421. body := replaceWithReplacer(c.Body, replacer)
  1422. subject := replaceWithReplacer(c.Subject, replacer)
  1423. recipients := getEmailAddressesWithReplacer(c.Recipients, replacer)
  1424. bcc := getEmailAddressesWithReplacer(c.Bcc, replacer)
  1425. startTime := time.Now()
  1426. var files []*mail.File
  1427. fileAttachments := make([]string, 0, len(c.Attachments))
  1428. for _, attachment := range c.Attachments {
  1429. if attachment == dataprovider.RetentionReportPlaceHolder {
  1430. f, err := params.getRetentionReportsAsMailAttachment()
  1431. if err != nil {
  1432. return err
  1433. }
  1434. files = append(files, f)
  1435. continue
  1436. }
  1437. fileAttachments = append(fileAttachments, attachment)
  1438. }
  1439. if len(fileAttachments) > 0 {
  1440. user, err := params.getUserFromSender()
  1441. if err != nil {
  1442. return err
  1443. }
  1444. user, err = getUserForEventAction(user)
  1445. if err != nil {
  1446. return err
  1447. }
  1448. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1449. err = user.CheckFsRoot(connectionID)
  1450. defer user.CloseFs() //nolint:errcheck
  1451. if err != nil {
  1452. return fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  1453. }
  1454. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1455. res, err := getMailAttachments(conn, fileAttachments, replacer)
  1456. if err != nil {
  1457. return err
  1458. }
  1459. files = append(files, res...)
  1460. }
  1461. err := smtp.SendEmail(recipients, bcc, subject, body, smtp.EmailContentType(c.ContentType), files...)
  1462. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1463. time.Since(startTime), err)
  1464. if err != nil {
  1465. return fmt.Errorf("unable to send email: %w", err)
  1466. }
  1467. return nil
  1468. }
  1469. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1470. err := user.LoadAndApplyGroupSettings()
  1471. if err != nil {
  1472. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1473. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1474. }
  1475. user.UploadDataTransfer = 0
  1476. user.UploadBandwidth = 0
  1477. user.DownloadBandwidth = 0
  1478. user.Filters.DisableFsChecks = false
  1479. user.Filters.FilePatterns = nil
  1480. user.Filters.BandwidthLimits = nil
  1481. for k := range user.Permissions {
  1482. user.Permissions[k] = []string{dataprovider.PermAny}
  1483. }
  1484. return user, nil
  1485. }
  1486. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1487. results := make([]string, 0, len(paths))
  1488. for _, p := range paths {
  1489. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1490. }
  1491. return util.RemoveDuplicates(results, false)
  1492. }
  1493. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1494. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1495. if err != nil {
  1496. return err
  1497. }
  1498. return conn.RemoveFile(fs, fsPath, item, info)
  1499. }
  1500. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1501. user, err := getUserForEventAction(user)
  1502. if err != nil {
  1503. return err
  1504. }
  1505. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1506. err = user.CheckFsRoot(connectionID)
  1507. defer user.CloseFs() //nolint:errcheck
  1508. if err != nil {
  1509. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1510. }
  1511. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1512. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1513. info, err := conn.DoStat(item, 0, false)
  1514. if err != nil {
  1515. if conn.IsNotExistError(err) {
  1516. continue
  1517. }
  1518. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1519. }
  1520. if info.IsDir() {
  1521. if err = conn.RemoveDir(item); err != nil {
  1522. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1523. }
  1524. } else {
  1525. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1526. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1527. }
  1528. }
  1529. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1530. }
  1531. return nil
  1532. }
  1533. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1534. conditions dataprovider.ConditionOptions, params *EventParams,
  1535. ) error {
  1536. users, err := params.getUsers()
  1537. if err != nil {
  1538. return fmt.Errorf("unable to get users: %w", err)
  1539. }
  1540. var failures []string
  1541. executed := 0
  1542. for _, user := range users {
  1543. // if sender is set, the conditions have already been evaluated
  1544. if params.sender == "" {
  1545. if !checkUserConditionOptions(&user, &conditions) {
  1546. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1547. user.Username)
  1548. continue
  1549. }
  1550. }
  1551. executed++
  1552. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1553. params.AddError(err)
  1554. failures = append(failures, user.Username)
  1555. }
  1556. }
  1557. if len(failures) > 0 {
  1558. return fmt.Errorf("fs delete failed for users: %s", strings.Join(failures, ", "))
  1559. }
  1560. if executed == 0 {
  1561. eventManagerLog(logger.LevelError, "no delete executed")
  1562. return errors.New("no delete executed")
  1563. }
  1564. return nil
  1565. }
  1566. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1567. user, err := getUserForEventAction(user)
  1568. if err != nil {
  1569. return err
  1570. }
  1571. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1572. err = user.CheckFsRoot(connectionID)
  1573. defer user.CloseFs() //nolint:errcheck
  1574. if err != nil {
  1575. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1576. }
  1577. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1578. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1579. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1580. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1581. }
  1582. if err = conn.createDirIfMissing(item); err != nil {
  1583. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1584. }
  1585. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1586. }
  1587. return nil
  1588. }
  1589. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1590. conditions dataprovider.ConditionOptions, params *EventParams,
  1591. ) error {
  1592. users, err := params.getUsers()
  1593. if err != nil {
  1594. return fmt.Errorf("unable to get users: %w", err)
  1595. }
  1596. var failures []string
  1597. executed := 0
  1598. for _, user := range users {
  1599. // if sender is set, the conditions have already been evaluated
  1600. if params.sender == "" {
  1601. if !checkUserConditionOptions(&user, &conditions) {
  1602. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1603. user.Username)
  1604. continue
  1605. }
  1606. }
  1607. executed++
  1608. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1609. failures = append(failures, user.Username)
  1610. }
  1611. }
  1612. if len(failures) > 0 {
  1613. return fmt.Errorf("fs mkdir failed for users: %s", strings.Join(failures, ", "))
  1614. }
  1615. if executed == 0 {
  1616. eventManagerLog(logger.LevelError, "no mkdir executed")
  1617. return errors.New("no mkdir executed")
  1618. }
  1619. return nil
  1620. }
  1621. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1622. user dataprovider.User,
  1623. ) error {
  1624. user, err := getUserForEventAction(user)
  1625. if err != nil {
  1626. return err
  1627. }
  1628. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1629. err = user.CheckFsRoot(connectionID)
  1630. defer user.CloseFs() //nolint:errcheck
  1631. if err != nil {
  1632. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1633. }
  1634. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1635. for _, item := range renames {
  1636. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1637. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1638. if err = conn.renameInternal(source, target, true); err != nil {
  1639. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1640. }
  1641. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1642. }
  1643. return nil
  1644. }
  1645. func executeCopyFsActionForUser(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1646. user dataprovider.User,
  1647. ) error {
  1648. user, err := getUserForEventAction(user)
  1649. if err != nil {
  1650. return err
  1651. }
  1652. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1653. err = user.CheckFsRoot(connectionID)
  1654. defer user.CloseFs() //nolint:errcheck
  1655. if err != nil {
  1656. return fmt.Errorf("copy error, unable to check root fs for user %q: %w", user.Username, err)
  1657. }
  1658. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1659. for _, item := range copy {
  1660. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1661. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1662. if strings.HasSuffix(item.Key, "/") {
  1663. source += "/"
  1664. }
  1665. if strings.HasSuffix(item.Value, "/") {
  1666. target += "/"
  1667. }
  1668. if err = conn.Copy(source, target); err != nil {
  1669. return fmt.Errorf("unable to copy %q->%q, user %q: %w", source, target, user.Username, err)
  1670. }
  1671. eventManagerLog(logger.LevelDebug, "copy %q->%q ok, user %q", source, target, user.Username)
  1672. }
  1673. return nil
  1674. }
  1675. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1676. user dataprovider.User,
  1677. ) error {
  1678. user, err := getUserForEventAction(user)
  1679. if err != nil {
  1680. return err
  1681. }
  1682. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1683. err = user.CheckFsRoot(connectionID)
  1684. defer user.CloseFs() //nolint:errcheck
  1685. if err != nil {
  1686. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1687. }
  1688. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1689. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1690. if _, err = conn.DoStat(item, 0, false); err != nil {
  1691. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1692. }
  1693. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1694. }
  1695. return nil
  1696. }
  1697. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1698. conditions dataprovider.ConditionOptions, params *EventParams,
  1699. ) error {
  1700. users, err := params.getUsers()
  1701. if err != nil {
  1702. return fmt.Errorf("unable to get users: %w", err)
  1703. }
  1704. var failures []string
  1705. executed := 0
  1706. for _, user := range users {
  1707. // if sender is set, the conditions have already been evaluated
  1708. if params.sender == "" {
  1709. if !checkUserConditionOptions(&user, &conditions) {
  1710. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1711. user.Username)
  1712. continue
  1713. }
  1714. }
  1715. executed++
  1716. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1717. failures = append(failures, user.Username)
  1718. params.AddError(err)
  1719. }
  1720. }
  1721. if len(failures) > 0 {
  1722. return fmt.Errorf("fs rename failed for users: %s", strings.Join(failures, ", "))
  1723. }
  1724. if executed == 0 {
  1725. eventManagerLog(logger.LevelError, "no rename executed")
  1726. return errors.New("no rename executed")
  1727. }
  1728. return nil
  1729. }
  1730. func executeCopyFsRuleAction(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1731. conditions dataprovider.ConditionOptions, params *EventParams,
  1732. ) error {
  1733. users, err := params.getUsers()
  1734. if err != nil {
  1735. return fmt.Errorf("unable to get users: %w", err)
  1736. }
  1737. var failures []string
  1738. var executed int
  1739. for _, user := range users {
  1740. // if sender is set, the conditions have already been evaluated
  1741. if params.sender == "" {
  1742. if !checkUserConditionOptions(&user, &conditions) {
  1743. eventManagerLog(logger.LevelDebug, "skipping fs copy for user %s, condition options don't match",
  1744. user.Username)
  1745. continue
  1746. }
  1747. }
  1748. executed++
  1749. if err = executeCopyFsActionForUser(copy, replacer, user); err != nil {
  1750. failures = append(failures, user.Username)
  1751. params.AddError(err)
  1752. }
  1753. }
  1754. if len(failures) > 0 {
  1755. return fmt.Errorf("fs copy failed for users: %s", strings.Join(failures, ", "))
  1756. }
  1757. if executed == 0 {
  1758. eventManagerLog(logger.LevelError, "no copy executed")
  1759. return errors.New("no copy executed")
  1760. }
  1761. return nil
  1762. }
  1763. func getArchiveBaseDir(paths []string) string {
  1764. var parentDirs []string
  1765. for _, p := range paths {
  1766. parentDirs = append(parentDirs, path.Dir(p))
  1767. }
  1768. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1769. baseDir := "/"
  1770. if len(parentDirs) == 1 {
  1771. baseDir = parentDirs[0]
  1772. }
  1773. return baseDir
  1774. }
  1775. func getSizeForPath(conn *BaseConnection, p string, info os.FileInfo) (int64, error) {
  1776. if info.IsDir() {
  1777. var dirSize int64
  1778. lister, err := conn.ListDir(p)
  1779. if err != nil {
  1780. return 0, err
  1781. }
  1782. defer lister.Close()
  1783. for {
  1784. entries, err := lister.Next(vfs.ListerBatchSize)
  1785. finished := errors.Is(err, io.EOF)
  1786. if err != nil && !finished {
  1787. return 0, err
  1788. }
  1789. for _, entry := range entries {
  1790. size, err := getSizeForPath(conn, path.Join(p, entry.Name()), entry)
  1791. if err != nil {
  1792. return 0, err
  1793. }
  1794. dirSize += size
  1795. }
  1796. if finished {
  1797. return dirSize, nil
  1798. }
  1799. }
  1800. }
  1801. if info.Mode().IsRegular() {
  1802. return info.Size(), nil
  1803. }
  1804. return 0, nil
  1805. }
  1806. func estimateZipSize(conn *BaseConnection, zipPath string, paths []string) (int64, error) {
  1807. q, _ := conn.HasSpace(false, false, zipPath)
  1808. if q.HasSpace && q.GetRemainingSize() > 0 {
  1809. var size int64
  1810. for _, item := range paths {
  1811. info, err := conn.DoStat(item, 1, false)
  1812. if err != nil {
  1813. return size, err
  1814. }
  1815. itemSize, err := getSizeForPath(conn, item, info)
  1816. if err != nil {
  1817. return size, err
  1818. }
  1819. size += itemSize
  1820. }
  1821. eventManagerLog(logger.LevelDebug, "archive paths %v, archive name %q, size: %d", paths, zipPath, size)
  1822. // we assume the zip size will be half of the real size
  1823. return size / 2, nil
  1824. }
  1825. return -1, nil
  1826. }
  1827. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1828. user dataprovider.User,
  1829. ) error {
  1830. user, err := getUserForEventAction(user)
  1831. if err != nil {
  1832. return err
  1833. }
  1834. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1835. err = user.CheckFsRoot(connectionID)
  1836. defer user.CloseFs() //nolint:errcheck
  1837. if err != nil {
  1838. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1839. }
  1840. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1841. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1842. conn.CheckParentDirs(path.Dir(name)) //nolint:errcheck
  1843. paths := make([]string, 0, len(c.Paths))
  1844. for idx := range c.Paths {
  1845. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1846. if p == name {
  1847. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1848. }
  1849. paths = append(paths, p)
  1850. }
  1851. paths = util.RemoveDuplicates(paths, false)
  1852. estimatedSize, err := estimateZipSize(conn, name, paths)
  1853. if err != nil {
  1854. eventManagerLog(logger.LevelError, "unable to estimate size for archive %q: %v", name, err)
  1855. return fmt.Errorf("unable to estimate archive size: %w", err)
  1856. }
  1857. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name, estimatedSize)
  1858. if err != nil {
  1859. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1860. return fmt.Errorf("unable to create archive: %w", err)
  1861. }
  1862. defer cancelFn()
  1863. baseDir := getArchiveBaseDir(paths)
  1864. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1865. zipWriter := &zipWriterWrapper{
  1866. Name: name,
  1867. Writer: zip.NewWriter(writer),
  1868. Entries: make(map[string]bool),
  1869. }
  1870. startTime := time.Now()
  1871. for _, item := range paths {
  1872. if err := addZipEntry(zipWriter, conn, item, baseDir, 0); err != nil {
  1873. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1874. return err
  1875. }
  1876. }
  1877. if err := zipWriter.Writer.Close(); err != nil {
  1878. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1879. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1880. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1881. }
  1882. return closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime)
  1883. }
  1884. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1885. params *EventParams,
  1886. ) error {
  1887. users, err := params.getUsers()
  1888. if err != nil {
  1889. return fmt.Errorf("unable to get users: %w", err)
  1890. }
  1891. var failures []string
  1892. executed := 0
  1893. for _, user := range users {
  1894. // if sender is set, the conditions have already been evaluated
  1895. if params.sender == "" {
  1896. if !checkUserConditionOptions(&user, &conditions) {
  1897. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1898. user.Username)
  1899. continue
  1900. }
  1901. }
  1902. executed++
  1903. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1904. failures = append(failures, user.Username)
  1905. params.AddError(err)
  1906. }
  1907. }
  1908. if len(failures) > 0 {
  1909. return fmt.Errorf("fs existence check failed for users: %s", strings.Join(failures, ", "))
  1910. }
  1911. if executed == 0 {
  1912. eventManagerLog(logger.LevelError, "no existence check executed")
  1913. return errors.New("no existence check executed")
  1914. }
  1915. return nil
  1916. }
  1917. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1918. conditions dataprovider.ConditionOptions, params *EventParams,
  1919. ) error {
  1920. users, err := params.getUsers()
  1921. if err != nil {
  1922. return fmt.Errorf("unable to get users: %w", err)
  1923. }
  1924. var failures []string
  1925. executed := 0
  1926. for _, user := range users {
  1927. // if sender is set, the conditions have already been evaluated
  1928. if params.sender == "" {
  1929. if !checkUserConditionOptions(&user, &conditions) {
  1930. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1931. user.Username)
  1932. continue
  1933. }
  1934. }
  1935. executed++
  1936. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1937. failures = append(failures, user.Username)
  1938. params.AddError(err)
  1939. }
  1940. }
  1941. if len(failures) > 0 {
  1942. return fmt.Errorf("fs compress failed for users: %s", strings.Join(failures, ","))
  1943. }
  1944. if executed == 0 {
  1945. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1946. return errors.New("no file/folder compressed")
  1947. }
  1948. return nil
  1949. }
  1950. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1951. params *EventParams,
  1952. ) error {
  1953. addObjectData := false
  1954. replacements := params.getStringReplacements(addObjectData, false)
  1955. replacer := strings.NewReplacer(replacements...)
  1956. switch c.Type {
  1957. case dataprovider.FilesystemActionRename:
  1958. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1959. case dataprovider.FilesystemActionDelete:
  1960. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1961. case dataprovider.FilesystemActionMkdirs:
  1962. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1963. case dataprovider.FilesystemActionExist:
  1964. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1965. case dataprovider.FilesystemActionCompress:
  1966. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1967. case dataprovider.FilesystemActionCopy:
  1968. return executeCopyFsRuleAction(c.Copy, replacer, conditions, params)
  1969. default:
  1970. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1971. }
  1972. }
  1973. func executeQuotaResetForUser(user *dataprovider.User) error {
  1974. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1975. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1976. user.Username, err)
  1977. return err
  1978. }
  1979. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  1980. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1981. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1982. }
  1983. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1984. numFiles, size, err := user.ScanQuota()
  1985. if err != nil {
  1986. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1987. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1988. }
  1989. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  1990. if err != nil {
  1991. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1992. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1993. }
  1994. return nil
  1995. }
  1996. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1997. users, err := params.getUsers()
  1998. if err != nil {
  1999. return fmt.Errorf("unable to get users: %w", err)
  2000. }
  2001. var failures []string
  2002. executed := 0
  2003. for _, user := range users {
  2004. // if sender is set, the conditions have already been evaluated
  2005. if params.sender == "" {
  2006. if !checkUserConditionOptions(&user, &conditions) {
  2007. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  2008. user.Username)
  2009. continue
  2010. }
  2011. }
  2012. executed++
  2013. if err = executeQuotaResetForUser(&user); err != nil {
  2014. params.AddError(err)
  2015. failures = append(failures, user.Username)
  2016. }
  2017. }
  2018. if len(failures) > 0 {
  2019. return fmt.Errorf("quota reset failed for users: %s", strings.Join(failures, ", "))
  2020. }
  2021. if executed == 0 {
  2022. eventManagerLog(logger.LevelError, "no user quota reset executed")
  2023. return errors.New("no user quota reset executed")
  2024. }
  2025. return nil
  2026. }
  2027. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2028. folders, err := params.getFolders()
  2029. if err != nil {
  2030. return fmt.Errorf("unable to get folders: %w", err)
  2031. }
  2032. var failures []string
  2033. executed := 0
  2034. for _, folder := range folders {
  2035. // if sender is set, the conditions have already been evaluated
  2036. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  2037. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  2038. folder.Name)
  2039. continue
  2040. }
  2041. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  2042. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  2043. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  2044. failures = append(failures, folder.Name)
  2045. continue
  2046. }
  2047. executed++
  2048. f := vfs.VirtualFolder{
  2049. BaseVirtualFolder: folder,
  2050. VirtualPath: "/",
  2051. }
  2052. numFiles, size, err := f.ScanQuota()
  2053. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  2054. if err != nil {
  2055. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  2056. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  2057. failures = append(failures, folder.Name)
  2058. continue
  2059. }
  2060. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  2061. if err != nil {
  2062. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  2063. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  2064. failures = append(failures, folder.Name)
  2065. }
  2066. }
  2067. if len(failures) > 0 {
  2068. return fmt.Errorf("quota reset failed for folders: %s", strings.Join(failures, ", "))
  2069. }
  2070. if executed == 0 {
  2071. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  2072. return errors.New("no folder quota reset executed")
  2073. }
  2074. return nil
  2075. }
  2076. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2077. users, err := params.getUsers()
  2078. if err != nil {
  2079. return fmt.Errorf("unable to get users: %w", err)
  2080. }
  2081. var failures []string
  2082. executed := 0
  2083. for _, user := range users {
  2084. // if sender is set, the conditions have already been evaluated
  2085. if params.sender == "" {
  2086. if !checkUserConditionOptions(&user, &conditions) {
  2087. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  2088. user.Username)
  2089. continue
  2090. }
  2091. }
  2092. executed++
  2093. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  2094. if err != nil {
  2095. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  2096. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  2097. failures = append(failures, user.Username)
  2098. }
  2099. }
  2100. if len(failures) > 0 {
  2101. return fmt.Errorf("transfer quota reset failed for users: %s", strings.Join(failures, ", "))
  2102. }
  2103. if executed == 0 {
  2104. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  2105. return errors.New("no transfer quota reset executed")
  2106. }
  2107. return nil
  2108. }
  2109. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  2110. params *EventParams, actionName string,
  2111. ) error {
  2112. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2113. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  2114. user.Username, err)
  2115. return err
  2116. }
  2117. check := RetentionCheck{
  2118. Folders: folders,
  2119. }
  2120. c := RetentionChecks.Add(check, &user)
  2121. if c == nil {
  2122. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  2123. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  2124. }
  2125. defer func() {
  2126. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  2127. Username: user.Username,
  2128. ActionName: actionName,
  2129. Results: c.results,
  2130. })
  2131. }()
  2132. if err := c.Start(); err != nil {
  2133. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  2134. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  2135. }
  2136. return nil
  2137. }
  2138. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  2139. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  2140. ) error {
  2141. users, err := params.getUsers()
  2142. if err != nil {
  2143. return fmt.Errorf("unable to get users: %w", err)
  2144. }
  2145. var failures []string
  2146. executed := 0
  2147. for _, user := range users {
  2148. // if sender is set, the conditions have already been evaluated
  2149. if params.sender == "" {
  2150. if !checkUserConditionOptions(&user, &conditions) {
  2151. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  2152. user.Username)
  2153. continue
  2154. }
  2155. }
  2156. executed++
  2157. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  2158. failures = append(failures, user.Username)
  2159. params.AddError(err)
  2160. }
  2161. }
  2162. if len(failures) > 0 {
  2163. return fmt.Errorf("retention check failed for users: %s", strings.Join(failures, ", "))
  2164. }
  2165. if executed == 0 {
  2166. eventManagerLog(logger.LevelError, "no retention check executed")
  2167. return errors.New("no retention check executed")
  2168. }
  2169. return nil
  2170. }
  2171. func executeUserExpirationCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2172. users, err := params.getUsers()
  2173. if err != nil {
  2174. return fmt.Errorf("unable to get users: %w", err)
  2175. }
  2176. var failures []string
  2177. var executed int
  2178. for _, user := range users {
  2179. // if sender is set, the conditions have already been evaluated
  2180. if params.sender == "" {
  2181. if !checkUserConditionOptions(&user, &conditions) {
  2182. eventManagerLog(logger.LevelDebug, "skipping expiration check for user %q, condition options don't match",
  2183. user.Username)
  2184. continue
  2185. }
  2186. }
  2187. executed++
  2188. if user.ExpirationDate > 0 {
  2189. expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate)
  2190. if expDate.Before(time.Now()) {
  2191. failures = append(failures, user.Username)
  2192. }
  2193. }
  2194. }
  2195. if len(failures) > 0 {
  2196. return fmt.Errorf("expired users: %s", strings.Join(failures, ", "))
  2197. }
  2198. if executed == 0 {
  2199. eventManagerLog(logger.LevelError, "no user expiration check executed")
  2200. return errors.New("no user expiration check executed")
  2201. }
  2202. return nil
  2203. }
  2204. func executeInactivityCheckForUser(user *dataprovider.User, config dataprovider.EventActionUserInactivity, when time.Time) error {
  2205. if config.DeleteThreshold > 0 && (user.Status == 0 || config.DisableThreshold == 0) {
  2206. if inactivityDays := user.InactivityDays(when); inactivityDays > config.DeleteThreshold {
  2207. err := dataprovider.DeleteUser(user.Username, dataprovider.ActionExecutorSystem, "", "")
  2208. eventManagerLog(logger.LevelInfo, "deleting inactive user %q, days of inactivity: %d/%d, err: %v",
  2209. user.Username, inactivityDays, config.DeleteThreshold, err)
  2210. if err != nil {
  2211. return fmt.Errorf("unable to delete inactive user %q", user.Username)
  2212. }
  2213. return fmt.Errorf("inactive user %q deleted. Number of days of inactivity: %d", user.Username, inactivityDays)
  2214. }
  2215. }
  2216. if config.DisableThreshold > 0 && user.Status > 0 {
  2217. if inactivityDays := user.InactivityDays(when); inactivityDays > config.DisableThreshold {
  2218. user.Status = 0
  2219. err := dataprovider.UpdateUser(user, dataprovider.ActionExecutorSystem, "", "")
  2220. eventManagerLog(logger.LevelInfo, "disabling inactive user %q, days of inactivity: %d/%d, err: %v",
  2221. user.Username, inactivityDays, config.DisableThreshold, err)
  2222. if err != nil {
  2223. return fmt.Errorf("unable to disable inactive user %q", user.Username)
  2224. }
  2225. return fmt.Errorf("inactive user %q disabled. Number of days of inactivity: %d", user.Username, inactivityDays)
  2226. }
  2227. }
  2228. return nil
  2229. }
  2230. func executeUserInactivityCheckRuleAction(config dataprovider.EventActionUserInactivity,
  2231. conditions dataprovider.ConditionOptions,
  2232. params *EventParams,
  2233. when time.Time,
  2234. ) error {
  2235. users, err := params.getUsers()
  2236. if err != nil {
  2237. return fmt.Errorf("unable to get users: %w", err)
  2238. }
  2239. var failures []string
  2240. for _, user := range users {
  2241. // if sender is set, the conditions have already been evaluated
  2242. if params.sender == "" {
  2243. if !checkUserConditionOptions(&user, &conditions) {
  2244. eventManagerLog(logger.LevelDebug, "skipping inactivity check for user %q, condition options don't match",
  2245. user.Username)
  2246. continue
  2247. }
  2248. }
  2249. if err = executeInactivityCheckForUser(&user, config, when); err != nil {
  2250. params.AddError(err)
  2251. failures = append(failures, user.Username)
  2252. }
  2253. }
  2254. if len(failures) > 0 {
  2255. return fmt.Errorf("executed inactivity check actions for users: %s", strings.Join(failures, ", "))
  2256. }
  2257. return nil
  2258. }
  2259. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  2260. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2261. eventManagerLog(logger.LevelError, "skipping password expiration check for user %q, cannot apply group settings: %v",
  2262. user.Username, err)
  2263. return err
  2264. }
  2265. if user.ExpirationDate > 0 {
  2266. if expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate); expDate.Before(time.Now()) {
  2267. eventManagerLog(logger.LevelDebug, "skipping password expiration check for expired user %q, expiration date: %s",
  2268. user.Username, expDate)
  2269. return nil
  2270. }
  2271. }
  2272. if user.Filters.PasswordExpiration == 0 {
  2273. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  2274. return nil
  2275. }
  2276. days := user.PasswordExpiresIn()
  2277. if days > config.Threshold {
  2278. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  2279. user.Username, days, config.Threshold)
  2280. return nil
  2281. }
  2282. body := new(bytes.Buffer)
  2283. data := make(map[string]any)
  2284. data["Username"] = user.Username
  2285. data["Days"] = days
  2286. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  2287. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  2288. user.Username, err)
  2289. return err
  2290. }
  2291. subject := "SFTPGo password expiration notification"
  2292. startTime := time.Now()
  2293. if err := smtp.SendEmail([]string{user.Email}, nil, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  2294. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  2295. user.Username, err, time.Since(startTime))
  2296. return err
  2297. }
  2298. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  2299. user.Username, days, time.Since(startTime))
  2300. return nil
  2301. }
  2302. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  2303. params *EventParams) error {
  2304. users, err := params.getUsers()
  2305. if err != nil {
  2306. return fmt.Errorf("unable to get users: %w", err)
  2307. }
  2308. var failures []string
  2309. for _, user := range users {
  2310. // if sender is set, the conditions have already been evaluated
  2311. if params.sender == "" {
  2312. if !checkUserConditionOptions(&user, &conditions) {
  2313. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  2314. user.Username)
  2315. continue
  2316. }
  2317. }
  2318. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  2319. params.AddError(err)
  2320. failures = append(failures, user.Username)
  2321. }
  2322. }
  2323. if len(failures) > 0 {
  2324. return fmt.Errorf("password expiration check failed for users: %s", strings.Join(failures, ", "))
  2325. }
  2326. return nil
  2327. }
  2328. func executeAdminCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.Admin, error) {
  2329. admin, err := dataprovider.AdminExists(params.Name)
  2330. exists := err == nil
  2331. if exists && c.Mode == 1 {
  2332. return &admin, nil
  2333. }
  2334. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2335. return nil, err
  2336. }
  2337. replacements := params.getStringReplacements(false, true)
  2338. replacer := strings.NewReplacer(replacements...)
  2339. data := replaceWithReplacer(c.TemplateAdmin, replacer)
  2340. var newAdmin dataprovider.Admin
  2341. err = json.Unmarshal(util.StringToBytes(data), &newAdmin)
  2342. if err != nil {
  2343. return nil, err
  2344. }
  2345. if exists {
  2346. eventManagerLog(logger.LevelDebug, "updating admin %q after IDP login", params.Name)
  2347. // Not sure if this makes sense, but it shouldn't hurt.
  2348. if newAdmin.Password == "" {
  2349. newAdmin.Password = admin.Password
  2350. }
  2351. newAdmin.Filters.TOTPConfig = admin.Filters.TOTPConfig
  2352. newAdmin.Filters.RecoveryCodes = admin.Filters.RecoveryCodes
  2353. err = dataprovider.UpdateAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2354. } else {
  2355. eventManagerLog(logger.LevelDebug, "creating admin %q after IDP login", params.Name)
  2356. if newAdmin.Password == "" {
  2357. newAdmin.Password = util.GenerateUniqueID()
  2358. }
  2359. err = dataprovider.AddAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2360. }
  2361. return &newAdmin, err
  2362. }
  2363. func preserveUserProfile(user, newUser *dataprovider.User) {
  2364. if newUser.CanChangePassword() && user.Password != "" {
  2365. newUser.Password = user.Password
  2366. }
  2367. if newUser.CanManagePublicKeys() && len(user.PublicKeys) > 0 {
  2368. newUser.PublicKeys = user.PublicKeys
  2369. }
  2370. if newUser.CanManageTLSCerts() {
  2371. if len(user.Filters.TLSCerts) > 0 {
  2372. newUser.Filters.TLSCerts = user.Filters.TLSCerts
  2373. }
  2374. }
  2375. if newUser.CanChangeInfo() {
  2376. if user.Description != "" {
  2377. newUser.Description = user.Description
  2378. }
  2379. if user.Email != "" {
  2380. newUser.Email = user.Email
  2381. }
  2382. }
  2383. if newUser.CanChangeAPIKeyAuth() {
  2384. newUser.Filters.AllowAPIKeyAuth = user.Filters.AllowAPIKeyAuth
  2385. }
  2386. newUser.Filters.RecoveryCodes = user.Filters.RecoveryCodes
  2387. newUser.Filters.TOTPConfig = user.Filters.TOTPConfig
  2388. newUser.LastPasswordChange = user.LastPasswordChange
  2389. newUser.SetEmptySecretsIfNil()
  2390. }
  2391. func executeUserCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.User, error) {
  2392. user, err := dataprovider.UserExists(params.Name, "")
  2393. exists := err == nil
  2394. if exists && c.Mode == 1 {
  2395. err = user.LoadAndApplyGroupSettings()
  2396. return &user, err
  2397. }
  2398. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2399. return nil, err
  2400. }
  2401. replacements := params.getStringReplacements(false, true)
  2402. replacer := strings.NewReplacer(replacements...)
  2403. data := replaceWithReplacer(c.TemplateUser, replacer)
  2404. var newUser dataprovider.User
  2405. err = json.Unmarshal(util.StringToBytes(data), &newUser)
  2406. if err != nil {
  2407. return nil, err
  2408. }
  2409. if exists {
  2410. eventManagerLog(logger.LevelDebug, "updating user %q after IDP login", params.Name)
  2411. preserveUserProfile(&user, &newUser)
  2412. err = dataprovider.UpdateUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2413. } else {
  2414. eventManagerLog(logger.LevelDebug, "creating user %q after IDP login", params.Name)
  2415. err = dataprovider.AddUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2416. }
  2417. if err != nil {
  2418. return nil, err
  2419. }
  2420. u, err := dataprovider.GetUserWithGroupSettings(params.Name, "")
  2421. return &u, err
  2422. }
  2423. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams, //nolint:gocyclo
  2424. conditions dataprovider.ConditionOptions,
  2425. ) error {
  2426. if len(conditions.EventStatuses) > 0 && !slices.Contains(conditions.EventStatuses, params.Status) {
  2427. eventManagerLog(logger.LevelDebug, "skipping action %s, event status %d does not match: %v",
  2428. action.Name, params.Status, conditions.EventStatuses)
  2429. return nil
  2430. }
  2431. var err error
  2432. switch action.Type {
  2433. case dataprovider.ActionTypeHTTP:
  2434. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  2435. case dataprovider.ActionTypeCommand:
  2436. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  2437. case dataprovider.ActionTypeEmail:
  2438. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  2439. case dataprovider.ActionTypeBackup:
  2440. var backupPath string
  2441. backupPath, err = dataprovider.ExecuteBackup()
  2442. if err == nil {
  2443. params.setBackupParams(backupPath)
  2444. }
  2445. case dataprovider.ActionTypeUserQuotaReset:
  2446. err = executeUsersQuotaResetRuleAction(conditions, params)
  2447. case dataprovider.ActionTypeFolderQuotaReset:
  2448. err = executeFoldersQuotaResetRuleAction(conditions, params)
  2449. case dataprovider.ActionTypeTransferQuotaReset:
  2450. err = executeTransferQuotaResetRuleAction(conditions, params)
  2451. case dataprovider.ActionTypeDataRetentionCheck:
  2452. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  2453. case dataprovider.ActionTypeFilesystem:
  2454. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  2455. case dataprovider.ActionTypePasswordExpirationCheck:
  2456. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  2457. case dataprovider.ActionTypeUserExpirationCheck:
  2458. err = executeUserExpirationCheckRuleAction(conditions, params)
  2459. case dataprovider.ActionTypeUserInactivityCheck:
  2460. err = executeUserInactivityCheckRuleAction(action.Options.UserInactivityConfig, conditions, params, time.Now())
  2461. case dataprovider.ActionTypeRotateLogs:
  2462. err = logger.RotateLogFile()
  2463. default:
  2464. err = fmt.Errorf("unsupported action type: %d", action.Type)
  2465. }
  2466. if err != nil {
  2467. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  2468. }
  2469. params.AddError(err)
  2470. return err
  2471. }
  2472. func executeIDPAccountCheckRule(rule dataprovider.EventRule, params EventParams) (*dataprovider.User,
  2473. *dataprovider.Admin, error,
  2474. ) {
  2475. for _, action := range rule.Actions {
  2476. if action.Type == dataprovider.ActionTypeIDPAccountCheck {
  2477. startTime := time.Now()
  2478. var user *dataprovider.User
  2479. var admin *dataprovider.Admin
  2480. var err error
  2481. var failedActions []string
  2482. paramsCopy := params.getACopy()
  2483. switch params.Event {
  2484. case IDPLoginAdmin:
  2485. admin, err = executeAdminCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2486. case IDPLoginUser:
  2487. user, err = executeUserCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2488. default:
  2489. err = fmt.Errorf("unsupported IDP login event: %q", params.Event)
  2490. }
  2491. if err != nil {
  2492. paramsCopy.AddError(fmt.Errorf("unable to handle %q: %w", params.Event, err))
  2493. eventManagerLog(logger.LevelError, "unable to handle IDP login event %q, err: %v", params.Event, err)
  2494. failedActions = append(failedActions, action.Name)
  2495. } else {
  2496. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2497. action.Name, rule.Name, time.Since(startTime))
  2498. }
  2499. // execute async actions if any, including failure actions
  2500. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2501. return user, admin, err
  2502. }
  2503. }
  2504. eventManagerLog(logger.LevelError, "no action executed for IDP login event %q, event rule: %q", params.Event, rule.Name)
  2505. return nil, nil, errors.New("no action executed")
  2506. }
  2507. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  2508. var errRes error
  2509. for _, rule := range rules {
  2510. var failedActions []string
  2511. paramsCopy := params.getACopy()
  2512. for _, action := range rule.Actions {
  2513. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  2514. startTime := time.Now()
  2515. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  2516. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  2517. action.Name, rule.Name, time.Since(startTime), err)
  2518. failedActions = append(failedActions, action.Name)
  2519. // we return the last error, it is ok for now
  2520. errRes = err
  2521. if action.Options.StopOnFailure {
  2522. break
  2523. }
  2524. } else {
  2525. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  2526. action.Name, rule.Name, time.Since(startTime))
  2527. }
  2528. }
  2529. }
  2530. // execute async actions if any, including failure actions
  2531. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2532. }
  2533. return errRes
  2534. }
  2535. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  2536. eventManager.addAsyncTask()
  2537. defer eventManager.removeAsyncTask()
  2538. params.addUID()
  2539. for _, rule := range rules {
  2540. executeRuleAsyncActions(rule, params.getACopy(), nil)
  2541. }
  2542. }
  2543. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  2544. for _, action := range rule.Actions {
  2545. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  2546. startTime := time.Now()
  2547. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2548. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  2549. action.Name, rule.Name, time.Since(startTime), err)
  2550. failedActions = append(failedActions, action.Name)
  2551. if action.Options.StopOnFailure {
  2552. break
  2553. }
  2554. } else {
  2555. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2556. action.Name, rule.Name, time.Since(startTime))
  2557. }
  2558. }
  2559. }
  2560. if len(failedActions) > 0 {
  2561. params.updateStatusFromError = false
  2562. // execute failure actions
  2563. for _, action := range rule.Actions {
  2564. if action.Options.IsFailureAction {
  2565. startTime := time.Now()
  2566. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2567. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  2568. action.Name, rule.Name, time.Since(startTime), err)
  2569. if action.Options.StopOnFailure {
  2570. break
  2571. }
  2572. } else {
  2573. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  2574. action.Name, rule.Name, time.Since(startTime))
  2575. }
  2576. }
  2577. }
  2578. }
  2579. }
  2580. type eventCronJob struct {
  2581. ruleName string
  2582. }
  2583. func (j *eventCronJob) getTask(rule *dataprovider.EventRule) (dataprovider.Task, error) {
  2584. if rule.GuardFromConcurrentExecution() {
  2585. task, err := dataprovider.GetTaskByName(rule.Name)
  2586. if err != nil {
  2587. if errors.Is(err, util.ErrNotFound) {
  2588. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  2589. task = dataprovider.Task{
  2590. Name: rule.Name,
  2591. UpdateAt: 0,
  2592. Version: 0,
  2593. }
  2594. err = dataprovider.AddTask(rule.Name)
  2595. if err != nil {
  2596. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  2597. return task, err
  2598. }
  2599. } else {
  2600. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  2601. }
  2602. }
  2603. return task, err
  2604. }
  2605. return dataprovider.Task{}, nil
  2606. }
  2607. func (j *eventCronJob) Run() {
  2608. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  2609. rule, err := dataprovider.EventRuleExists(j.ruleName)
  2610. if err != nil {
  2611. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  2612. return
  2613. }
  2614. if err := rule.CheckActionsConsistency(""); err != nil {
  2615. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  2616. return
  2617. }
  2618. task, err := j.getTask(&rule)
  2619. if err != nil {
  2620. return
  2621. }
  2622. if task.Name != "" {
  2623. updateInterval := 5 * time.Minute
  2624. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  2625. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  2626. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  2627. return
  2628. }
  2629. err = dataprovider.UpdateTask(rule.Name, task.Version)
  2630. if err != nil {
  2631. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  2632. rule.Name, err)
  2633. return
  2634. }
  2635. ticker := time.NewTicker(updateInterval)
  2636. done := make(chan bool)
  2637. defer func() {
  2638. done <- true
  2639. ticker.Stop()
  2640. }()
  2641. go func(taskName string) {
  2642. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2643. for {
  2644. select {
  2645. case <-done:
  2646. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2647. return
  2648. case <-ticker.C:
  2649. err := dataprovider.UpdateTaskTimestamp(taskName)
  2650. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2651. }
  2652. }
  2653. }(task.Name)
  2654. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2655. } else {
  2656. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2657. }
  2658. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2659. }
  2660. // RunOnDemandRule executes actions for a rule with on-demand trigger
  2661. func RunOnDemandRule(name string) error {
  2662. eventManagerLog(logger.LevelDebug, "executing on demand rule %q", name)
  2663. rule, err := dataprovider.EventRuleExists(name)
  2664. if err != nil {
  2665. eventManagerLog(logger.LevelDebug, "unable to load rule with name %q", name)
  2666. return util.NewRecordNotFoundError(fmt.Sprintf("rule %q does not exist", name))
  2667. }
  2668. if rule.Trigger != dataprovider.EventTriggerOnDemand {
  2669. eventManagerLog(logger.LevelDebug, "cannot run rule %q as on demand, trigger: %d", name, rule.Trigger)
  2670. return util.NewValidationError(fmt.Sprintf("rule %q is not defined as on-demand", name))
  2671. }
  2672. if rule.Status != 1 {
  2673. eventManagerLog(logger.LevelDebug, "on-demand rule %q is inactive", name)
  2674. return util.NewValidationError(fmt.Sprintf("rule %q is inactive", name))
  2675. }
  2676. if err := rule.CheckActionsConsistency(""); err != nil {
  2677. eventManagerLog(logger.LevelError, "on-demand rule %q has incompatible actions: %v", name, err)
  2678. return util.NewValidationError(fmt.Sprintf("rule %q has incosistent actions", name))
  2679. }
  2680. eventManagerLog(logger.LevelDebug, "on-demand rule %q started", name)
  2681. go executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2682. return nil
  2683. }
  2684. type zipWriterWrapper struct {
  2685. Name string
  2686. Entries map[string]bool
  2687. Writer *zip.Writer
  2688. }
  2689. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2690. logger.Log(level, "eventmanager", "", format, v...)
  2691. }