eventmanager.go 89 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "mime/multipart"
  25. "net/http"
  26. "net/textproto"
  27. "net/url"
  28. "os"
  29. "os/exec"
  30. "path"
  31. "path/filepath"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "time"
  37. "github.com/bmatcuk/doublestar/v4"
  38. "github.com/klauspost/compress/zip"
  39. "github.com/robfig/cron/v3"
  40. "github.com/rs/xid"
  41. "github.com/sftpgo/sdk"
  42. "github.com/wneessen/go-mail"
  43. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  44. "github.com/drakkan/sftpgo/v2/internal/logger"
  45. "github.com/drakkan/sftpgo/v2/internal/plugin"
  46. "github.com/drakkan/sftpgo/v2/internal/smtp"
  47. "github.com/drakkan/sftpgo/v2/internal/util"
  48. "github.com/drakkan/sftpgo/v2/internal/vfs"
  49. )
  50. const (
  51. ipBlockedEventName = "IP Blocked"
  52. maxAttachmentsSize = int64(10 * 1024 * 1024)
  53. objDataPlaceholder = "{{ObjectData}}"
  54. )
  55. // Supported IDP login events
  56. const (
  57. IDPLoginUser = "IDP login user"
  58. IDPLoginAdmin = "IDP login admin"
  59. )
  60. var (
  61. // eventManager handle the supported event rules actions
  62. eventManager eventRulesContainer
  63. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  64. )
  65. func init() {
  66. eventManager = eventRulesContainer{
  67. schedulesMapping: make(map[string][]cron.EntryID),
  68. // arbitrary maximum number of concurrent asynchronous tasks,
  69. // each task could execute multiple actions
  70. concurrencyGuard: make(chan struct{}, 200),
  71. }
  72. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  73. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  74. p := EventParams{
  75. Name: executor,
  76. ObjectName: objectName,
  77. Event: operation,
  78. Status: 1,
  79. ObjectType: objectType,
  80. IP: ip,
  81. Role: role,
  82. Timestamp: time.Now().UnixNano(),
  83. Object: object,
  84. }
  85. if u, ok := object.(*dataprovider.User); ok {
  86. p.Email = u.Email
  87. } else if a, ok := object.(*dataprovider.Admin); ok {
  88. p.Email = a.Email
  89. }
  90. eventManager.handleProviderEvent(p)
  91. })
  92. }
  93. // HandleCertificateEvent checks and executes action rules for certificate events
  94. func HandleCertificateEvent(params EventParams) {
  95. eventManager.handleCertificateEvent(params)
  96. }
  97. // HandleIDPLoginEvent executes actions defined for a successful login from an Identity Provider
  98. func HandleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User, *dataprovider.Admin, error) {
  99. return eventManager.handleIDPLoginEvent(params, customFields)
  100. }
  101. // eventRulesContainer stores event rules by trigger
  102. type eventRulesContainer struct {
  103. sync.RWMutex
  104. lastLoad atomic.Int64
  105. FsEvents []dataprovider.EventRule
  106. ProviderEvents []dataprovider.EventRule
  107. Schedules []dataprovider.EventRule
  108. IPBlockedEvents []dataprovider.EventRule
  109. CertificateEvents []dataprovider.EventRule
  110. IPDLoginEvents []dataprovider.EventRule
  111. schedulesMapping map[string][]cron.EntryID
  112. concurrencyGuard chan struct{}
  113. }
  114. func (r *eventRulesContainer) addAsyncTask() {
  115. activeHooks.Add(1)
  116. r.concurrencyGuard <- struct{}{}
  117. }
  118. func (r *eventRulesContainer) removeAsyncTask() {
  119. activeHooks.Add(-1)
  120. <-r.concurrencyGuard
  121. }
  122. func (r *eventRulesContainer) getLastLoadTime() int64 {
  123. return r.lastLoad.Load()
  124. }
  125. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  126. r.lastLoad.Store(modTime)
  127. }
  128. // RemoveRule deletes the rule with the specified name
  129. func (r *eventRulesContainer) RemoveRule(name string) {
  130. r.Lock()
  131. defer r.Unlock()
  132. r.removeRuleInternal(name)
  133. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  134. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  135. }
  136. func (r *eventRulesContainer) removeRuleInternal(name string) {
  137. for idx := range r.FsEvents {
  138. if r.FsEvents[idx].Name == name {
  139. lastIdx := len(r.FsEvents) - 1
  140. r.FsEvents[idx] = r.FsEvents[lastIdx]
  141. r.FsEvents = r.FsEvents[:lastIdx]
  142. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  143. return
  144. }
  145. }
  146. for idx := range r.ProviderEvents {
  147. if r.ProviderEvents[idx].Name == name {
  148. lastIdx := len(r.ProviderEvents) - 1
  149. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  150. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  151. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  152. return
  153. }
  154. }
  155. for idx := range r.IPBlockedEvents {
  156. if r.IPBlockedEvents[idx].Name == name {
  157. lastIdx := len(r.IPBlockedEvents) - 1
  158. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  159. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  160. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  161. return
  162. }
  163. }
  164. for idx := range r.CertificateEvents {
  165. if r.CertificateEvents[idx].Name == name {
  166. lastIdx := len(r.CertificateEvents) - 1
  167. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  168. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  169. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  170. return
  171. }
  172. }
  173. for idx := range r.IPDLoginEvents {
  174. if r.IPDLoginEvents[idx].Name == name {
  175. lastIdx := len(r.IPDLoginEvents) - 1
  176. r.IPDLoginEvents[idx] = r.IPDLoginEvents[lastIdx]
  177. r.IPDLoginEvents = r.IPDLoginEvents[:lastIdx]
  178. eventManagerLog(logger.LevelDebug, "removed rule %q from IDP login events", name)
  179. return
  180. }
  181. }
  182. for idx := range r.Schedules {
  183. if r.Schedules[idx].Name == name {
  184. if schedules, ok := r.schedulesMapping[name]; ok {
  185. for _, entryID := range schedules {
  186. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  187. eventScheduler.Remove(entryID)
  188. }
  189. delete(r.schedulesMapping, name)
  190. }
  191. lastIdx := len(r.Schedules) - 1
  192. r.Schedules[idx] = r.Schedules[lastIdx]
  193. r.Schedules = r.Schedules[:lastIdx]
  194. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  195. return
  196. }
  197. }
  198. }
  199. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  200. r.removeRuleInternal(rule.Name)
  201. if rule.DeletedAt > 0 {
  202. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  203. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  204. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  205. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  206. }
  207. return
  208. }
  209. if rule.Status != 1 || rule.Trigger == dataprovider.EventTriggerOnDemand {
  210. return
  211. }
  212. switch rule.Trigger {
  213. case dataprovider.EventTriggerFsEvent:
  214. r.FsEvents = append(r.FsEvents, rule)
  215. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  216. case dataprovider.EventTriggerProviderEvent:
  217. r.ProviderEvents = append(r.ProviderEvents, rule)
  218. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  219. case dataprovider.EventTriggerIPBlocked:
  220. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  221. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  222. case dataprovider.EventTriggerCertificate:
  223. r.CertificateEvents = append(r.CertificateEvents, rule)
  224. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  225. case dataprovider.EventTriggerIDPLogin:
  226. r.IPDLoginEvents = append(r.IPDLoginEvents, rule)
  227. eventManagerLog(logger.LevelDebug, "added rule %q to IDP login events", rule.Name)
  228. case dataprovider.EventTriggerSchedule:
  229. for _, schedule := range rule.Conditions.Schedules {
  230. cronSpec := schedule.GetCronSpec()
  231. job := &eventCronJob{
  232. ruleName: dataprovider.ConvertName(rule.Name),
  233. }
  234. entryID, err := eventScheduler.AddJob(cronSpec, job)
  235. if err != nil {
  236. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  237. return
  238. }
  239. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  240. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  241. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  242. }
  243. r.Schedules = append(r.Schedules, rule)
  244. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  245. default:
  246. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  247. }
  248. }
  249. func (r *eventRulesContainer) loadRules() {
  250. eventManagerLog(logger.LevelDebug, "loading updated rules")
  251. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  252. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  253. if err != nil {
  254. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  255. return
  256. }
  257. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  258. if len(rules) > 0 {
  259. r.Lock()
  260. defer r.Unlock()
  261. for _, rule := range rules {
  262. r.addUpdateRuleInternal(rule)
  263. }
  264. }
  265. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d, IDP login events: %d",
  266. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents), len(r.IPDLoginEvents))
  267. r.setLastLoadTime(modTime)
  268. }
  269. func (*eventRulesContainer) checkIPDLoginEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  270. switch conditions.IDPLoginEvent {
  271. case dataprovider.IDPLoginUser:
  272. if params.Event != IDPLoginUser {
  273. return false
  274. }
  275. case dataprovider.IDPLoginAdmin:
  276. if params.Event != IDPLoginAdmin {
  277. return false
  278. }
  279. }
  280. return checkEventConditionPatterns(params.Name, conditions.Options.Names)
  281. }
  282. func (*eventRulesContainer) checkProviderEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  283. if !util.Contains(conditions.ProviderEvents, params.Event) {
  284. return false
  285. }
  286. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  287. return false
  288. }
  289. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  290. return false
  291. }
  292. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  293. return false
  294. }
  295. return true
  296. }
  297. func (*eventRulesContainer) checkFsEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  298. if !util.Contains(conditions.FsEvents, params.Event) {
  299. return false
  300. }
  301. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  302. return false
  303. }
  304. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  305. return false
  306. }
  307. if !checkEventGroupConditionPatterns(params.Groups, conditions.Options.GroupNames) {
  308. return false
  309. }
  310. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  311. return false
  312. }
  313. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  314. return false
  315. }
  316. if params.Event == operationUpload || params.Event == operationDownload {
  317. if conditions.Options.MinFileSize > 0 {
  318. if params.FileSize < conditions.Options.MinFileSize {
  319. return false
  320. }
  321. }
  322. if conditions.Options.MaxFileSize > 0 {
  323. if params.FileSize > conditions.Options.MaxFileSize {
  324. return false
  325. }
  326. }
  327. }
  328. return true
  329. }
  330. // hasFsRules returns true if there are any rules for filesystem event triggers
  331. func (r *eventRulesContainer) hasFsRules() bool {
  332. r.RLock()
  333. defer r.RUnlock()
  334. return len(r.FsEvents) > 0
  335. }
  336. // handleFsEvent executes the rules actions defined for the specified event.
  337. // The boolean parameter indicates whether a sync action was executed
  338. func (r *eventRulesContainer) handleFsEvent(params EventParams) (bool, error) {
  339. if params.Protocol == protocolEventAction {
  340. return false, nil
  341. }
  342. r.RLock()
  343. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  344. for _, rule := range r.FsEvents {
  345. if r.checkFsEventMatch(&rule.Conditions, &params) {
  346. if err := rule.CheckActionsConsistency(""); err != nil {
  347. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  348. rule.Name, err, params.Event)
  349. continue
  350. }
  351. hasSyncActions := false
  352. for _, action := range rule.Actions {
  353. if action.Options.ExecuteSync {
  354. hasSyncActions = true
  355. break
  356. }
  357. }
  358. if hasSyncActions {
  359. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  360. } else {
  361. rulesAsync = append(rulesAsync, rule)
  362. }
  363. }
  364. }
  365. r.RUnlock()
  366. params.sender = params.Name
  367. if len(rulesAsync) > 0 {
  368. go executeAsyncRulesActions(rulesAsync, params)
  369. }
  370. if len(rulesWithSyncActions) > 0 {
  371. return true, executeSyncRulesActions(rulesWithSyncActions, params)
  372. }
  373. return false, nil
  374. }
  375. func (r *eventRulesContainer) handleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User,
  376. *dataprovider.Admin, error,
  377. ) {
  378. r.RLock()
  379. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  380. for _, rule := range r.IPDLoginEvents {
  381. if r.checkIPDLoginEventMatch(&rule.Conditions, &params) {
  382. if err := rule.CheckActionsConsistency(""); err != nil {
  383. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  384. rule.Name, err, params.Event)
  385. continue
  386. }
  387. hasSyncActions := false
  388. for _, action := range rule.Actions {
  389. if action.Options.ExecuteSync {
  390. hasSyncActions = true
  391. break
  392. }
  393. }
  394. if hasSyncActions {
  395. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  396. } else {
  397. rulesAsync = append(rulesAsync, rule)
  398. }
  399. }
  400. }
  401. r.RUnlock()
  402. if len(rulesAsync) == 0 && len(rulesWithSyncActions) == 0 {
  403. return nil, nil, nil
  404. }
  405. params.addIDPCustomFields(customFields)
  406. if len(rulesWithSyncActions) > 1 {
  407. var ruleNames []string
  408. for _, r := range rulesWithSyncActions {
  409. ruleNames = append(ruleNames, r.Name)
  410. }
  411. return nil, nil, fmt.Errorf("more than one account check action rules matches: %q", strings.Join(ruleNames, ","))
  412. }
  413. if len(rulesAsync) > 0 {
  414. go executeAsyncRulesActions(rulesAsync, params)
  415. }
  416. if len(rulesWithSyncActions) > 0 {
  417. return executeIDPAccountCheckRule(rulesWithSyncActions[0], params)
  418. }
  419. return nil, nil, nil
  420. }
  421. // username is populated for user objects
  422. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  423. r.RLock()
  424. defer r.RUnlock()
  425. var rules []dataprovider.EventRule
  426. for _, rule := range r.ProviderEvents {
  427. if r.checkProviderEventMatch(&rule.Conditions, &params) {
  428. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  429. rules = append(rules, rule)
  430. } else {
  431. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  432. rule.Name, err, params.Event, params.ObjectType)
  433. }
  434. }
  435. }
  436. if len(rules) > 0 {
  437. params.sender = params.ObjectName
  438. go executeAsyncRulesActions(rules, params)
  439. }
  440. }
  441. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  442. r.RLock()
  443. defer r.RUnlock()
  444. if len(r.IPBlockedEvents) == 0 {
  445. return
  446. }
  447. var rules []dataprovider.EventRule
  448. for _, rule := range r.IPBlockedEvents {
  449. if err := rule.CheckActionsConsistency(""); err == nil {
  450. rules = append(rules, rule)
  451. } else {
  452. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  453. rule.Name, err, params.Event)
  454. }
  455. }
  456. if len(rules) > 0 {
  457. go executeAsyncRulesActions(rules, params)
  458. }
  459. }
  460. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  461. r.RLock()
  462. defer r.RUnlock()
  463. if len(r.CertificateEvents) == 0 {
  464. return
  465. }
  466. var rules []dataprovider.EventRule
  467. for _, rule := range r.CertificateEvents {
  468. if err := rule.CheckActionsConsistency(""); err == nil {
  469. rules = append(rules, rule)
  470. } else {
  471. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  472. rule.Name, err, params.Event)
  473. }
  474. }
  475. if len(rules) > 0 {
  476. go executeAsyncRulesActions(rules, params)
  477. }
  478. }
  479. type executedRetentionCheck struct {
  480. Username string
  481. ActionName string
  482. Results []folderRetentionCheckResult
  483. }
  484. // EventParams defines the supported event parameters
  485. type EventParams struct {
  486. Name string
  487. Groups []sdk.GroupMapping
  488. Event string
  489. Status int
  490. VirtualPath string
  491. FsPath string
  492. VirtualTargetPath string
  493. FsTargetPath string
  494. ObjectName string
  495. ObjectType string
  496. FileSize int64
  497. Elapsed int64
  498. Protocol string
  499. IP string
  500. Role string
  501. Email string
  502. Timestamp int64
  503. IDPCustomFields *map[string]string
  504. Object plugin.Renderer
  505. sender string
  506. updateStatusFromError bool
  507. errors []string
  508. retentionChecks []executedRetentionCheck
  509. }
  510. func (p *EventParams) getACopy() *EventParams {
  511. params := *p
  512. params.errors = make([]string, len(p.errors))
  513. copy(params.errors, p.errors)
  514. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  515. for _, c := range p.retentionChecks {
  516. executedCheck := executedRetentionCheck{
  517. Username: c.Username,
  518. ActionName: c.ActionName,
  519. }
  520. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  521. copy(executedCheck.Results, c.Results)
  522. retentionChecks = append(retentionChecks, executedCheck)
  523. }
  524. params.retentionChecks = retentionChecks
  525. if p.IDPCustomFields != nil {
  526. fields := make(map[string]string)
  527. for k, v := range *p.IDPCustomFields {
  528. fields[k] = v
  529. }
  530. params.IDPCustomFields = &fields
  531. }
  532. return &params
  533. }
  534. func (p *EventParams) addIDPCustomFields(customFields *map[string]any) {
  535. if customFields == nil {
  536. return
  537. }
  538. fields := make(map[string]string)
  539. for k, v := range *customFields {
  540. switch val := v.(type) {
  541. case string:
  542. fields[k] = val
  543. }
  544. }
  545. p.IDPCustomFields = &fields
  546. }
  547. // AddError adds a new error to the event params and update the status if needed
  548. func (p *EventParams) AddError(err error) {
  549. if err == nil {
  550. return
  551. }
  552. if p.updateStatusFromError && p.Status == 1 {
  553. p.Status = 2
  554. }
  555. p.errors = append(p.errors, err.Error())
  556. }
  557. func (p *EventParams) setBackupParams(backupPath string) {
  558. if p.sender != "" {
  559. return
  560. }
  561. p.sender = dataprovider.ActionExecutorSystem
  562. p.FsPath = backupPath
  563. p.ObjectName = filepath.Base(backupPath)
  564. p.VirtualPath = "/" + p.ObjectName
  565. p.Timestamp = time.Now().UnixNano()
  566. info, err := os.Stat(backupPath)
  567. if err == nil {
  568. p.FileSize = info.Size()
  569. }
  570. }
  571. func (p *EventParams) getStatusString() string {
  572. switch p.Status {
  573. case 1:
  574. return "OK"
  575. default:
  576. return "KO"
  577. }
  578. }
  579. // getUsers returns users with group settings not applied
  580. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  581. if p.sender == "" {
  582. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeUsers})
  583. if err != nil {
  584. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  585. return nil, errors.New("unable to get users")
  586. }
  587. return dump.Users, nil
  588. }
  589. user, err := p.getUserFromSender()
  590. if err != nil {
  591. return nil, err
  592. }
  593. return []dataprovider.User{user}, nil
  594. }
  595. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  596. if p.sender == dataprovider.ActionExecutorSystem {
  597. return dataprovider.User{
  598. BaseUser: sdk.BaseUser{
  599. Status: 1,
  600. Username: p.sender,
  601. HomeDir: dataprovider.GetBackupsPath(),
  602. Permissions: map[string][]string{
  603. "/": {dataprovider.PermAny},
  604. },
  605. },
  606. }, nil
  607. }
  608. user, err := dataprovider.UserExists(p.sender, "")
  609. if err != nil {
  610. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  611. return user, fmt.Errorf("error getting user %q", p.sender)
  612. }
  613. return user, nil
  614. }
  615. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  616. if p.sender == "" {
  617. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeFolders})
  618. return dump.Folders, err
  619. }
  620. folder, err := dataprovider.GetFolderByName(p.sender)
  621. if err != nil {
  622. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  623. }
  624. return []vfs.BaseVirtualFolder{folder}, nil
  625. }
  626. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  627. if len(p.retentionChecks) == 0 {
  628. return nil, errors.New("no data retention report available")
  629. }
  630. var b bytes.Buffer
  631. if _, err := p.writeCompressedDataRetentionReports(&b); err != nil {
  632. return nil, err
  633. }
  634. return b.Bytes(), nil
  635. }
  636. func (p *EventParams) writeCompressedDataRetentionReports(w io.Writer) (int64, error) {
  637. var n int64
  638. wr := zip.NewWriter(w)
  639. for _, check := range p.retentionChecks {
  640. data, err := getCSVRetentionReport(check.Results)
  641. if err != nil {
  642. return n, fmt.Errorf("unable to get CSV report: %w", err)
  643. }
  644. dataSize := int64(len(data))
  645. n += dataSize
  646. // we suppose a 3:1 compression ratio
  647. if n > (maxAttachmentsSize * 3) {
  648. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s",
  649. util.ByteCountIEC(n))
  650. return n, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(n))
  651. }
  652. fh := &zip.FileHeader{
  653. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  654. Method: zip.Deflate,
  655. Modified: time.Now().UTC(),
  656. }
  657. f, err := wr.CreateHeader(fh)
  658. if err != nil {
  659. return n, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  660. }
  661. _, err = io.CopyN(f, bytes.NewBuffer(data), dataSize)
  662. if err != nil {
  663. return n, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  664. }
  665. }
  666. if err := wr.Close(); err != nil {
  667. return n, fmt.Errorf("unable to close zip writer: %w", err)
  668. }
  669. return n, nil
  670. }
  671. func (p *EventParams) getRetentionReportsAsMailAttachment() (*mail.File, error) {
  672. if len(p.retentionChecks) == 0 {
  673. return nil, errors.New("no data retention report available")
  674. }
  675. return &mail.File{
  676. Name: "retention-reports.zip",
  677. Header: make(map[string][]string),
  678. Writer: p.writeCompressedDataRetentionReports,
  679. }, nil
  680. }
  681. func (*EventParams) getStringReplacement(val string, jsonEscaped bool) string {
  682. if jsonEscaped {
  683. return util.JSONEscape(val)
  684. }
  685. return val
  686. }
  687. func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []string {
  688. replacements := []string{
  689. "{{Name}}", p.getStringReplacement(p.Name, jsonEscaped),
  690. "{{Event}}", p.Event,
  691. "{{Status}}", fmt.Sprintf("%d", p.Status),
  692. "{{VirtualPath}}", p.getStringReplacement(p.VirtualPath, jsonEscaped),
  693. "{{FsPath}}", p.getStringReplacement(p.FsPath, jsonEscaped),
  694. "{{VirtualTargetPath}}", p.getStringReplacement(p.VirtualTargetPath, jsonEscaped),
  695. "{{FsTargetPath}}", p.getStringReplacement(p.FsTargetPath, jsonEscaped),
  696. "{{ObjectName}}", p.getStringReplacement(p.ObjectName, jsonEscaped),
  697. "{{ObjectType}}", p.ObjectType,
  698. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  699. "{{Elapsed}}", fmt.Sprintf("%d", p.Elapsed),
  700. "{{Protocol}}", p.Protocol,
  701. "{{IP}}", p.IP,
  702. "{{Role}}", p.getStringReplacement(p.Role, jsonEscaped),
  703. "{{Email}}", p.getStringReplacement(p.Email, jsonEscaped),
  704. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  705. "{{StatusString}}", p.getStatusString(),
  706. }
  707. if p.VirtualPath != "" {
  708. replacements = append(replacements, "{{VirtualDirPath}}", p.getStringReplacement(path.Dir(p.VirtualPath), jsonEscaped))
  709. }
  710. if p.VirtualTargetPath != "" {
  711. replacements = append(replacements, "{{VirtualTargetDirPath}}", p.getStringReplacement(path.Dir(p.VirtualTargetPath), jsonEscaped))
  712. replacements = append(replacements, "{{TargetName}}", p.getStringReplacement(path.Base(p.VirtualTargetPath), jsonEscaped))
  713. }
  714. if len(p.errors) > 0 {
  715. replacements = append(replacements, "{{ErrorString}}", p.getStringReplacement(strings.Join(p.errors, ", "), jsonEscaped))
  716. } else {
  717. replacements = append(replacements, "{{ErrorString}}", "")
  718. }
  719. replacements = append(replacements, objDataPlaceholder, "")
  720. if addObjectData {
  721. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  722. if err == nil {
  723. replacements[len(replacements)-1] = p.getStringReplacement(string(data), jsonEscaped)
  724. }
  725. }
  726. if p.IDPCustomFields != nil {
  727. for k, v := range *p.IDPCustomFields {
  728. replacements = append(replacements, fmt.Sprintf("{{IDPField%s}}", k), p.getStringReplacement(v, jsonEscaped))
  729. }
  730. }
  731. return replacements
  732. }
  733. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  734. var b bytes.Buffer
  735. csvWriter := csv.NewWriter(&b)
  736. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  737. "elapsed (ms)", "info", "error"})
  738. if err != nil {
  739. return nil, err
  740. }
  741. for _, result := range results {
  742. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  743. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  744. result.Info, result.Error})
  745. if err != nil {
  746. return nil, err
  747. }
  748. }
  749. csvWriter.Flush()
  750. err = csvWriter.Error()
  751. return b.Bytes(), err
  752. }
  753. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSourcePath, virtualTargetPath string,
  754. numFiles int, truncatedSize int64, errTransfer error, operation string, startTime time.Time,
  755. ) error {
  756. var fsDstPath string
  757. var errDstFs error
  758. errWrite := w.Close()
  759. targetPath := virtualSourcePath
  760. if virtualTargetPath != "" {
  761. targetPath = virtualTargetPath
  762. var fsDst vfs.Fs
  763. fsDst, fsDstPath, errDstFs = conn.GetFsAndResolvedPath(virtualTargetPath)
  764. if errTransfer != nil && errDstFs == nil {
  765. // try to remove a partial file on error. If this fails, we can't do anything
  766. errRemove := fsDst.Remove(fsDstPath, false)
  767. conn.Log(logger.LevelDebug, "removing partial file %q after write error, result: %v", virtualTargetPath, errRemove)
  768. }
  769. }
  770. info, err := conn.doStatInternal(targetPath, 0, false, false)
  771. if err == nil {
  772. updateUserQuotaAfterFileWrite(conn, targetPath, numFiles, info.Size()-truncatedSize)
  773. var fsSrcPath string
  774. var errSrcFs error
  775. if virtualSourcePath != "" {
  776. _, fsSrcPath, errSrcFs = conn.GetFsAndResolvedPath(virtualSourcePath)
  777. }
  778. if errSrcFs == nil && errDstFs == nil {
  779. elapsed := time.Since(startTime).Nanoseconds() / 1000000
  780. if errTransfer == nil {
  781. errTransfer = errWrite
  782. }
  783. if operation == operationCopy {
  784. logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
  785. "", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed)
  786. }
  787. ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed) //nolint:errcheck
  788. }
  789. } else {
  790. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
  791. }
  792. if errTransfer != nil {
  793. return errTransfer
  794. }
  795. return errWrite
  796. }
  797. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  798. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  799. if err != nil {
  800. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  801. return
  802. }
  803. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  804. if vfolder.IsIncludedInUserQuota() {
  805. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  806. }
  807. }
  808. func checkWriterPermsAndQuota(conn *BaseConnection, virtualPath string, numFiles int, expectedSize, truncatedSize int64) error {
  809. if numFiles == 0 {
  810. if !conn.User.HasPerm(dataprovider.PermOverwrite, path.Dir(virtualPath)) {
  811. return conn.GetPermissionDeniedError()
  812. }
  813. } else {
  814. if !conn.User.HasPerm(dataprovider.PermUpload, path.Dir(virtualPath)) {
  815. return conn.GetPermissionDeniedError()
  816. }
  817. }
  818. q, _ := conn.HasSpace(numFiles > 0, false, virtualPath)
  819. if !q.HasSpace {
  820. return conn.GetQuotaExceededError()
  821. }
  822. if expectedSize != -1 {
  823. sizeDiff := expectedSize - truncatedSize
  824. if sizeDiff > 0 {
  825. remainingSize := q.GetRemainingSize()
  826. if remainingSize > 0 && remainingSize < sizeDiff {
  827. return conn.GetQuotaExceededError()
  828. }
  829. }
  830. }
  831. return nil
  832. }
  833. func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64) (io.WriteCloser, int, int64, func(), error) {
  834. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  835. if err != nil {
  836. return nil, 0, 0, nil, err
  837. }
  838. var truncatedSize, fileSize int64
  839. numFiles := 1
  840. isFileOverwrite := false
  841. info, err := fs.Lstat(fsPath)
  842. if err == nil {
  843. fileSize = info.Size()
  844. if info.IsDir() {
  845. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  846. }
  847. if info.Mode().IsRegular() {
  848. isFileOverwrite = true
  849. truncatedSize = fileSize
  850. }
  851. numFiles = 0
  852. }
  853. if err != nil && !fs.IsNotExist(err) {
  854. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  855. }
  856. if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
  857. return nil, numFiles, truncatedSize, nil, err
  858. }
  859. f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1))
  860. if err != nil {
  861. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  862. }
  863. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  864. if isFileOverwrite {
  865. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  866. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  867. truncatedSize = 0
  868. }
  869. }
  870. if cancelFn == nil {
  871. cancelFn = func() {}
  872. }
  873. if f != nil {
  874. return f, numFiles, truncatedSize, cancelFn, nil
  875. }
  876. return w, numFiles, truncatedSize, cancelFn, nil
  877. }
  878. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string) error {
  879. if entryPath == wr.Name {
  880. // skip the archive itself
  881. return nil
  882. }
  883. info, err := conn.DoStat(entryPath, 1, false)
  884. if err != nil {
  885. eventManagerLog(logger.LevelError, "unable to add zip entry %q, stat error: %v", entryPath, err)
  886. return err
  887. }
  888. entryName, err := getZipEntryName(entryPath, baseDir)
  889. if err != nil {
  890. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  891. return err
  892. }
  893. if _, ok := wr.Entries[entryName]; ok {
  894. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  895. return nil
  896. }
  897. wr.Entries[entryName] = true
  898. if info.IsDir() {
  899. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  900. Name: entryName + "/",
  901. Method: zip.Deflate,
  902. Modified: info.ModTime(),
  903. })
  904. if err != nil {
  905. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  906. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  907. }
  908. contents, err := conn.ListDir(entryPath)
  909. if err != nil {
  910. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  911. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  912. }
  913. for _, info := range contents {
  914. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  915. if err := addZipEntry(wr, conn, fullPath, baseDir); err != nil {
  916. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  917. return err
  918. }
  919. }
  920. return nil
  921. }
  922. if !info.Mode().IsRegular() {
  923. // we only allow regular files
  924. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  925. return nil
  926. }
  927. reader, cancelFn, err := getFileReader(conn, entryPath)
  928. if err != nil {
  929. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  930. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  931. }
  932. defer cancelFn()
  933. defer reader.Close()
  934. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  935. Name: entryName,
  936. Method: zip.Deflate,
  937. Modified: info.ModTime(),
  938. })
  939. if err != nil {
  940. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  941. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  942. }
  943. _, err = io.Copy(f, reader)
  944. return err
  945. }
  946. func getZipEntryName(entryPath, baseDir string) (string, error) {
  947. if !strings.HasPrefix(entryPath, baseDir) {
  948. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  949. }
  950. entryPath = strings.TrimPrefix(entryPath, baseDir)
  951. return strings.TrimPrefix(entryPath, "/"), nil
  952. }
  953. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  954. if !conn.User.HasPerm(dataprovider.PermDownload, path.Dir(virtualPath)) {
  955. return nil, nil, conn.GetPermissionDeniedError()
  956. }
  957. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  958. if err != nil {
  959. return nil, nil, err
  960. }
  961. f, r, cancelFn, err := fs.Open(fsPath, 0)
  962. if err != nil {
  963. return nil, nil, conn.GetFsError(fs, err)
  964. }
  965. if cancelFn == nil {
  966. cancelFn = func() {}
  967. }
  968. if f != nil {
  969. return f, cancelFn, nil
  970. }
  971. return r, cancelFn, nil
  972. }
  973. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  974. reader, cancelFn, err := getFileReader(conn, virtualPath)
  975. if err != nil {
  976. return err
  977. }
  978. defer cancelFn()
  979. defer reader.Close()
  980. _, err = io.Copy(w, reader)
  981. return err
  982. }
  983. func getFileContentFn(conn *BaseConnection, virtualPath string, size int64) func(w io.Writer) (int64, error) {
  984. return func(w io.Writer) (int64, error) {
  985. reader, cancelFn, err := getFileReader(conn, virtualPath)
  986. if err != nil {
  987. return 0, err
  988. }
  989. defer cancelFn()
  990. defer reader.Close()
  991. return io.CopyN(w, reader, size)
  992. }
  993. }
  994. func getMailAttachments(conn *BaseConnection, attachments []string, replacer *strings.Replacer) ([]*mail.File, error) {
  995. var files []*mail.File
  996. totalSize := int64(0)
  997. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  998. info, err := conn.DoStat(virtualPath, 0, false)
  999. if err != nil {
  1000. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  1001. }
  1002. if !info.Mode().IsRegular() {
  1003. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  1004. }
  1005. totalSize += info.Size()
  1006. if totalSize > maxAttachmentsSize {
  1007. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  1008. }
  1009. files = append(files, &mail.File{
  1010. Name: path.Base(virtualPath),
  1011. Header: make(map[string][]string),
  1012. Writer: getFileContentFn(conn, virtualPath, info.Size()),
  1013. })
  1014. }
  1015. return files, nil
  1016. }
  1017. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  1018. if !strings.Contains(input, "{{") {
  1019. return input
  1020. }
  1021. return replacer.Replace(input)
  1022. }
  1023. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  1024. var matched bool
  1025. var err error
  1026. if strings.Contains(p.Pattern, "**") {
  1027. matched, err = doublestar.Match(p.Pattern, name)
  1028. } else {
  1029. matched, err = path.Match(p.Pattern, name)
  1030. }
  1031. if err != nil {
  1032. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  1033. return false
  1034. }
  1035. if p.InverseMatch {
  1036. return !matched
  1037. }
  1038. return matched
  1039. }
  1040. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  1041. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1042. return false
  1043. }
  1044. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  1045. return false
  1046. }
  1047. if !checkEventGroupConditionPatterns(user.Groups, conditions.GroupNames) {
  1048. return false
  1049. }
  1050. return true
  1051. }
  1052. // checkConditionPatterns returns false if patterns are defined and no match is found
  1053. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  1054. if len(patterns) == 0 {
  1055. return true
  1056. }
  1057. matches := false
  1058. for _, p := range patterns {
  1059. // assume, that multiple InverseMatches are set
  1060. if p.InverseMatch {
  1061. if checkEventConditionPattern(p, name) {
  1062. matches = true
  1063. } else {
  1064. return false
  1065. }
  1066. } else if checkEventConditionPattern(p, name) {
  1067. return true
  1068. }
  1069. }
  1070. return matches
  1071. }
  1072. func checkEventGroupConditionPatterns(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  1073. if len(patterns) == 0 {
  1074. return true
  1075. }
  1076. for _, group := range groups {
  1077. if checkEventConditionPatterns(group.Name, patterns) {
  1078. return true
  1079. }
  1080. }
  1081. return false
  1082. }
  1083. func getHTTPRuleActionEndpoint(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  1084. u, err := url.Parse(c.Endpoint)
  1085. if err != nil {
  1086. return "", fmt.Errorf("invalid endpoint: %w", err)
  1087. }
  1088. if strings.Contains(u.Path, "{{") {
  1089. pathComponents := strings.Split(u.Path, "/")
  1090. for idx := range pathComponents {
  1091. part := replaceWithReplacer(pathComponents[idx], replacer)
  1092. if part != pathComponents[idx] {
  1093. pathComponents[idx] = url.PathEscape(part)
  1094. }
  1095. }
  1096. u.Path = ""
  1097. u = u.JoinPath(pathComponents...)
  1098. }
  1099. if len(c.QueryParameters) > 0 {
  1100. q := u.Query()
  1101. for _, keyVal := range c.QueryParameters {
  1102. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1103. }
  1104. u.RawQuery = q.Encode()
  1105. }
  1106. return u.String(), nil
  1107. }
  1108. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  1109. conn *BaseConnection, replacer *strings.Replacer, params *EventParams, addObjectData bool,
  1110. ) error {
  1111. partWriter, err := m.CreatePart(h)
  1112. if err != nil {
  1113. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  1114. return err
  1115. }
  1116. if part.Body != "" {
  1117. cType := h.Get("Content-Type")
  1118. if part.Body != objDataPlaceholder && strings.Contains(strings.ToLower(cType), "application/json") {
  1119. replacements := params.getStringReplacements(addObjectData, true)
  1120. jsonReplacer := strings.NewReplacer(replacements...)
  1121. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, jsonReplacer)))
  1122. } else {
  1123. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, replacer)))
  1124. }
  1125. if err != nil {
  1126. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1127. return err
  1128. }
  1129. return nil
  1130. }
  1131. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  1132. data, err := params.getCompressedDataRetentionReport()
  1133. if err != nil {
  1134. return err
  1135. }
  1136. _, err = partWriter.Write(data)
  1137. if err != nil {
  1138. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1139. return err
  1140. }
  1141. return nil
  1142. }
  1143. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  1144. if err != nil {
  1145. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  1146. return err
  1147. }
  1148. return nil
  1149. }
  1150. func jsonEscapeRuleActionBody(c *dataprovider.EventActionHTTPConfig) bool {
  1151. return c.Body != objDataPlaceholder && c.HasJSONBody()
  1152. }
  1153. func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1154. cancel context.CancelFunc, user dataprovider.User, params *EventParams, addObjectData bool,
  1155. ) (io.Reader, string, error) {
  1156. var body io.Reader
  1157. if c.Method == http.MethodGet {
  1158. return body, "", nil
  1159. }
  1160. if c.Body != "" {
  1161. if c.Body == dataprovider.RetentionReportPlaceHolder {
  1162. data, err := params.getCompressedDataRetentionReport()
  1163. if err != nil {
  1164. return body, "", err
  1165. }
  1166. return bytes.NewBuffer(data), "", nil
  1167. }
  1168. if jsonEscapeRuleActionBody(c) {
  1169. replacements := params.getStringReplacements(addObjectData, true)
  1170. jsonReplacer := strings.NewReplacer(replacements...)
  1171. return bytes.NewBufferString(replaceWithReplacer(c.Body, jsonReplacer)), "", nil
  1172. }
  1173. return bytes.NewBufferString(replaceWithReplacer(c.Body, replacer)), "", nil
  1174. }
  1175. if len(c.Parts) > 0 {
  1176. r, w := io.Pipe()
  1177. m := multipart.NewWriter(w)
  1178. var conn *BaseConnection
  1179. if user.Username != "" {
  1180. var err error
  1181. user, err = getUserForEventAction(user)
  1182. if err != nil {
  1183. return body, "", err
  1184. }
  1185. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1186. err = user.CheckFsRoot(connectionID)
  1187. if err != nil {
  1188. user.CloseFs() //nolint:errcheck
  1189. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  1190. user.Username, err)
  1191. }
  1192. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1193. }
  1194. go func() {
  1195. defer w.Close()
  1196. defer user.CloseFs() //nolint:errcheck
  1197. for _, part := range c.Parts {
  1198. h := make(textproto.MIMEHeader)
  1199. if part.Body != "" {
  1200. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  1201. } else {
  1202. h.Set("Content-Disposition",
  1203. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  1204. multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(part.Filepath))))
  1205. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  1206. if contentType == "" {
  1207. contentType = "application/octet-stream"
  1208. }
  1209. h.Set("Content-Type", contentType)
  1210. }
  1211. for _, keyVal := range part.Headers {
  1212. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1213. }
  1214. if err := writeHTTPPart(m, part, h, conn, replacer, params, addObjectData); err != nil {
  1215. cancel()
  1216. return
  1217. }
  1218. }
  1219. m.Close()
  1220. }()
  1221. return r, m.FormDataContentType(), nil
  1222. }
  1223. return body, "", nil
  1224. }
  1225. func setHTTPReqHeaders(req *http.Request, c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1226. contentType string,
  1227. ) {
  1228. if contentType != "" {
  1229. req.Header.Set("Content-Type", contentType)
  1230. }
  1231. if c.Username != "" || c.Password.GetPayload() != "" {
  1232. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1233. }
  1234. for _, keyVal := range c.Headers {
  1235. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1236. }
  1237. }
  1238. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1239. if err := c.TryDecryptPassword(); err != nil {
  1240. return err
  1241. }
  1242. addObjectData := false
  1243. if params.Object != nil {
  1244. addObjectData = c.HasObjectData()
  1245. }
  1246. replacements := params.getStringReplacements(addObjectData, false)
  1247. replacer := strings.NewReplacer(replacements...)
  1248. endpoint, err := getHTTPRuleActionEndpoint(&c, replacer)
  1249. if err != nil {
  1250. return err
  1251. }
  1252. ctx, cancel := c.GetContext()
  1253. defer cancel()
  1254. var user dataprovider.User
  1255. if c.HasMultipartFiles() {
  1256. user, err = params.getUserFromSender()
  1257. if err != nil {
  1258. return err
  1259. }
  1260. }
  1261. body, contentType, err := getHTTPRuleActionBody(&c, replacer, cancel, user, params, addObjectData)
  1262. if err != nil {
  1263. return err
  1264. }
  1265. if body != nil {
  1266. rc, ok := body.(io.ReadCloser)
  1267. if ok {
  1268. defer rc.Close()
  1269. }
  1270. }
  1271. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1272. if err != nil {
  1273. return err
  1274. }
  1275. setHTTPReqHeaders(req, &c, replacer, contentType)
  1276. client := c.GetHTTPClient()
  1277. defer client.CloseIdleConnections()
  1278. startTime := time.Now()
  1279. resp, err := client.Do(req)
  1280. if err != nil {
  1281. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1282. endpoint, time.Since(startTime), err)
  1283. return fmt.Errorf("error sending HTTP request: %w", err)
  1284. }
  1285. defer resp.Body.Close()
  1286. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1287. endpoint, time.Since(startTime), resp.StatusCode)
  1288. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1289. if rb, err := io.ReadAll(io.LimitReader(resp.Body, 2048)); err == nil {
  1290. eventManagerLog(logger.LevelDebug, "error notification response from endpoint %q: %s", endpoint, string(rb))
  1291. }
  1292. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1293. }
  1294. return nil
  1295. }
  1296. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1297. addObjectData := false
  1298. if params.Object != nil {
  1299. for _, k := range c.EnvVars {
  1300. if strings.Contains(k.Value, objDataPlaceholder) {
  1301. addObjectData = true
  1302. break
  1303. }
  1304. }
  1305. }
  1306. replacements := params.getStringReplacements(addObjectData, false)
  1307. replacer := strings.NewReplacer(replacements...)
  1308. args := make([]string, 0, len(c.Args))
  1309. for _, arg := range c.Args {
  1310. args = append(args, replaceWithReplacer(arg, replacer))
  1311. }
  1312. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1313. defer cancel()
  1314. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1315. cmd.Env = []string{}
  1316. for _, keyVal := range c.EnvVars {
  1317. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1318. }
  1319. startTime := time.Now()
  1320. err := cmd.Run()
  1321. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1322. c.Cmd, time.Since(startTime), err)
  1323. return err
  1324. }
  1325. func getEmailAddressesWithReplacer(addrs []string, replacer *strings.Replacer) []string {
  1326. if len(addrs) == 0 {
  1327. return nil
  1328. }
  1329. recipients := make([]string, 0, len(addrs))
  1330. for _, recipient := range addrs {
  1331. rcpt := replaceWithReplacer(recipient, replacer)
  1332. if rcpt != "" {
  1333. recipients = append(recipients, rcpt)
  1334. }
  1335. }
  1336. return recipients
  1337. }
  1338. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1339. addObjectData := false
  1340. if params.Object != nil {
  1341. if strings.Contains(c.Body, objDataPlaceholder) {
  1342. addObjectData = true
  1343. }
  1344. }
  1345. replacements := params.getStringReplacements(addObjectData, false)
  1346. replacer := strings.NewReplacer(replacements...)
  1347. body := replaceWithReplacer(c.Body, replacer)
  1348. subject := replaceWithReplacer(c.Subject, replacer)
  1349. recipients := getEmailAddressesWithReplacer(c.Recipients, replacer)
  1350. bcc := getEmailAddressesWithReplacer(c.Bcc, replacer)
  1351. startTime := time.Now()
  1352. var files []*mail.File
  1353. fileAttachments := make([]string, 0, len(c.Attachments))
  1354. for _, attachment := range c.Attachments {
  1355. if attachment == dataprovider.RetentionReportPlaceHolder {
  1356. f, err := params.getRetentionReportsAsMailAttachment()
  1357. if err != nil {
  1358. return err
  1359. }
  1360. files = append(files, f)
  1361. continue
  1362. }
  1363. fileAttachments = append(fileAttachments, attachment)
  1364. }
  1365. if len(fileAttachments) > 0 {
  1366. user, err := params.getUserFromSender()
  1367. if err != nil {
  1368. return err
  1369. }
  1370. user, err = getUserForEventAction(user)
  1371. if err != nil {
  1372. return err
  1373. }
  1374. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1375. err = user.CheckFsRoot(connectionID)
  1376. defer user.CloseFs() //nolint:errcheck
  1377. if err != nil {
  1378. return fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  1379. }
  1380. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1381. res, err := getMailAttachments(conn, fileAttachments, replacer)
  1382. if err != nil {
  1383. return err
  1384. }
  1385. files = append(files, res...)
  1386. }
  1387. err := smtp.SendEmail(recipients, bcc, subject, body, smtp.EmailContentType(c.ContentType), files...)
  1388. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1389. time.Since(startTime), err)
  1390. if err != nil {
  1391. return fmt.Errorf("unable to send email: %w", err)
  1392. }
  1393. return nil
  1394. }
  1395. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1396. err := user.LoadAndApplyGroupSettings()
  1397. if err != nil {
  1398. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1399. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1400. }
  1401. user.UploadDataTransfer = 0
  1402. user.UploadBandwidth = 0
  1403. user.DownloadBandwidth = 0
  1404. user.Filters.DisableFsChecks = false
  1405. user.Filters.FilePatterns = nil
  1406. user.Filters.BandwidthLimits = nil
  1407. for k := range user.Permissions {
  1408. user.Permissions[k] = []string{dataprovider.PermAny}
  1409. }
  1410. return user, nil
  1411. }
  1412. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1413. results := make([]string, 0, len(paths))
  1414. for _, p := range paths {
  1415. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1416. }
  1417. return util.RemoveDuplicates(results, false)
  1418. }
  1419. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1420. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1421. if err != nil {
  1422. return err
  1423. }
  1424. return conn.RemoveFile(fs, fsPath, item, info)
  1425. }
  1426. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1427. user, err := getUserForEventAction(user)
  1428. if err != nil {
  1429. return err
  1430. }
  1431. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1432. err = user.CheckFsRoot(connectionID)
  1433. defer user.CloseFs() //nolint:errcheck
  1434. if err != nil {
  1435. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1436. }
  1437. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1438. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1439. info, err := conn.DoStat(item, 0, false)
  1440. if err != nil {
  1441. if conn.IsNotExistError(err) {
  1442. continue
  1443. }
  1444. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1445. }
  1446. if info.IsDir() {
  1447. if err = conn.RemoveDir(item); err != nil {
  1448. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1449. }
  1450. } else {
  1451. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1452. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1453. }
  1454. }
  1455. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1456. }
  1457. return nil
  1458. }
  1459. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1460. conditions dataprovider.ConditionOptions, params *EventParams,
  1461. ) error {
  1462. users, err := params.getUsers()
  1463. if err != nil {
  1464. return fmt.Errorf("unable to get users: %w", err)
  1465. }
  1466. var failures []string
  1467. executed := 0
  1468. for _, user := range users {
  1469. // if sender is set, the conditions have already been evaluated
  1470. if params.sender == "" {
  1471. if !checkUserConditionOptions(&user, &conditions) {
  1472. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1473. user.Username)
  1474. continue
  1475. }
  1476. }
  1477. executed++
  1478. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1479. params.AddError(err)
  1480. failures = append(failures, user.Username)
  1481. }
  1482. }
  1483. if len(failures) > 0 {
  1484. return fmt.Errorf("fs delete failed for users: %s", strings.Join(failures, ", "))
  1485. }
  1486. if executed == 0 {
  1487. eventManagerLog(logger.LevelError, "no delete executed")
  1488. return errors.New("no delete executed")
  1489. }
  1490. return nil
  1491. }
  1492. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1493. user, err := getUserForEventAction(user)
  1494. if err != nil {
  1495. return err
  1496. }
  1497. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1498. err = user.CheckFsRoot(connectionID)
  1499. defer user.CloseFs() //nolint:errcheck
  1500. if err != nil {
  1501. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1502. }
  1503. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1504. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1505. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1506. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1507. }
  1508. if err = conn.createDirIfMissing(item); err != nil {
  1509. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1510. }
  1511. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1512. }
  1513. return nil
  1514. }
  1515. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1516. conditions dataprovider.ConditionOptions, params *EventParams,
  1517. ) error {
  1518. users, err := params.getUsers()
  1519. if err != nil {
  1520. return fmt.Errorf("unable to get users: %w", err)
  1521. }
  1522. var failures []string
  1523. executed := 0
  1524. for _, user := range users {
  1525. // if sender is set, the conditions have already been evaluated
  1526. if params.sender == "" {
  1527. if !checkUserConditionOptions(&user, &conditions) {
  1528. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1529. user.Username)
  1530. continue
  1531. }
  1532. }
  1533. executed++
  1534. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1535. failures = append(failures, user.Username)
  1536. }
  1537. }
  1538. if len(failures) > 0 {
  1539. return fmt.Errorf("fs mkdir failed for users: %s", strings.Join(failures, ", "))
  1540. }
  1541. if executed == 0 {
  1542. eventManagerLog(logger.LevelError, "no mkdir executed")
  1543. return errors.New("no mkdir executed")
  1544. }
  1545. return nil
  1546. }
  1547. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1548. user dataprovider.User,
  1549. ) error {
  1550. user, err := getUserForEventAction(user)
  1551. if err != nil {
  1552. return err
  1553. }
  1554. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1555. err = user.CheckFsRoot(connectionID)
  1556. defer user.CloseFs() //nolint:errcheck
  1557. if err != nil {
  1558. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1559. }
  1560. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1561. for _, item := range renames {
  1562. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1563. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1564. if err = conn.renameInternal(source, target, true); err != nil {
  1565. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1566. }
  1567. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1568. }
  1569. return nil
  1570. }
  1571. func executeCopyFsActionForUser(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1572. user dataprovider.User,
  1573. ) error {
  1574. user, err := getUserForEventAction(user)
  1575. if err != nil {
  1576. return err
  1577. }
  1578. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1579. err = user.CheckFsRoot(connectionID)
  1580. defer user.CloseFs() //nolint:errcheck
  1581. if err != nil {
  1582. return fmt.Errorf("copy error, unable to check root fs for user %q: %w", user.Username, err)
  1583. }
  1584. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1585. for _, item := range copy {
  1586. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1587. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1588. if strings.HasSuffix(item.Key, "/") {
  1589. source += "/"
  1590. }
  1591. if strings.HasSuffix(item.Value, "/") {
  1592. target += "/"
  1593. }
  1594. if err = conn.Copy(source, target); err != nil {
  1595. return fmt.Errorf("unable to copy %q->%q, user %q: %w", source, target, user.Username, err)
  1596. }
  1597. eventManagerLog(logger.LevelDebug, "copy %q->%q ok, user %q", source, target, user.Username)
  1598. }
  1599. return nil
  1600. }
  1601. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1602. user dataprovider.User,
  1603. ) error {
  1604. user, err := getUserForEventAction(user)
  1605. if err != nil {
  1606. return err
  1607. }
  1608. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1609. err = user.CheckFsRoot(connectionID)
  1610. defer user.CloseFs() //nolint:errcheck
  1611. if err != nil {
  1612. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1613. }
  1614. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1615. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1616. if _, err = conn.DoStat(item, 0, false); err != nil {
  1617. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1618. }
  1619. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1620. }
  1621. return nil
  1622. }
  1623. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1624. conditions dataprovider.ConditionOptions, params *EventParams,
  1625. ) error {
  1626. users, err := params.getUsers()
  1627. if err != nil {
  1628. return fmt.Errorf("unable to get users: %w", err)
  1629. }
  1630. var failures []string
  1631. executed := 0
  1632. for _, user := range users {
  1633. // if sender is set, the conditions have already been evaluated
  1634. if params.sender == "" {
  1635. if !checkUserConditionOptions(&user, &conditions) {
  1636. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1637. user.Username)
  1638. continue
  1639. }
  1640. }
  1641. executed++
  1642. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1643. failures = append(failures, user.Username)
  1644. params.AddError(err)
  1645. }
  1646. }
  1647. if len(failures) > 0 {
  1648. return fmt.Errorf("fs rename failed for users: %s", strings.Join(failures, ", "))
  1649. }
  1650. if executed == 0 {
  1651. eventManagerLog(logger.LevelError, "no rename executed")
  1652. return errors.New("no rename executed")
  1653. }
  1654. return nil
  1655. }
  1656. func executeCopyFsRuleAction(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1657. conditions dataprovider.ConditionOptions, params *EventParams,
  1658. ) error {
  1659. users, err := params.getUsers()
  1660. if err != nil {
  1661. return fmt.Errorf("unable to get users: %w", err)
  1662. }
  1663. var failures []string
  1664. var executed int
  1665. for _, user := range users {
  1666. // if sender is set, the conditions have already been evaluated
  1667. if params.sender == "" {
  1668. if !checkUserConditionOptions(&user, &conditions) {
  1669. eventManagerLog(logger.LevelDebug, "skipping fs copy for user %s, condition options don't match",
  1670. user.Username)
  1671. continue
  1672. }
  1673. }
  1674. executed++
  1675. if err = executeCopyFsActionForUser(copy, replacer, user); err != nil {
  1676. failures = append(failures, user.Username)
  1677. params.AddError(err)
  1678. }
  1679. }
  1680. if len(failures) > 0 {
  1681. return fmt.Errorf("fs copy failed for users: %s", strings.Join(failures, ", "))
  1682. }
  1683. if executed == 0 {
  1684. eventManagerLog(logger.LevelError, "no copy executed")
  1685. return errors.New("no copy executed")
  1686. }
  1687. return nil
  1688. }
  1689. func getArchiveBaseDir(paths []string) string {
  1690. var parentDirs []string
  1691. for _, p := range paths {
  1692. parentDirs = append(parentDirs, path.Dir(p))
  1693. }
  1694. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1695. baseDir := "/"
  1696. if len(parentDirs) == 1 {
  1697. baseDir = parentDirs[0]
  1698. }
  1699. return baseDir
  1700. }
  1701. func getSizeForPath(conn *BaseConnection, p string, info os.FileInfo) (int64, error) {
  1702. if info.IsDir() {
  1703. var dirSize int64
  1704. entries, err := conn.ListDir(p)
  1705. if err != nil {
  1706. return 0, err
  1707. }
  1708. for _, entry := range entries {
  1709. size, err := getSizeForPath(conn, path.Join(p, entry.Name()), entry)
  1710. if err != nil {
  1711. return 0, err
  1712. }
  1713. dirSize += size
  1714. }
  1715. return dirSize, nil
  1716. }
  1717. if info.Mode().IsRegular() {
  1718. return info.Size(), nil
  1719. }
  1720. return 0, nil
  1721. }
  1722. func estimateZipSize(conn *BaseConnection, zipPath string, paths []string) (int64, error) {
  1723. q, _ := conn.HasSpace(false, false, zipPath)
  1724. if q.HasSpace && q.GetRemainingSize() > 0 {
  1725. var size int64
  1726. for _, item := range paths {
  1727. info, err := conn.DoStat(item, 1, false)
  1728. if err != nil {
  1729. return size, err
  1730. }
  1731. itemSize, err := getSizeForPath(conn, item, info)
  1732. if err != nil {
  1733. return size, err
  1734. }
  1735. size += itemSize
  1736. }
  1737. eventManagerLog(logger.LevelDebug, "archive paths %v, archive name %q, size: %d", paths, zipPath, size)
  1738. // we assume the zip size will be half of the real size
  1739. return size / 2, nil
  1740. }
  1741. return -1, nil
  1742. }
  1743. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1744. user dataprovider.User,
  1745. ) error {
  1746. user, err := getUserForEventAction(user)
  1747. if err != nil {
  1748. return err
  1749. }
  1750. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1751. err = user.CheckFsRoot(connectionID)
  1752. defer user.CloseFs() //nolint:errcheck
  1753. if err != nil {
  1754. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1755. }
  1756. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1757. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1758. conn.CheckParentDirs(path.Dir(name)) //nolint:errcheck
  1759. paths := make([]string, 0, len(c.Paths))
  1760. for idx := range c.Paths {
  1761. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1762. if p == name {
  1763. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1764. }
  1765. paths = append(paths, p)
  1766. }
  1767. paths = util.RemoveDuplicates(paths, false)
  1768. estimatedSize, err := estimateZipSize(conn, name, paths)
  1769. if err != nil {
  1770. eventManagerLog(logger.LevelError, "unable to estimate size for archive %q: %v", name, err)
  1771. return fmt.Errorf("unable to estimate archive size: %w", err)
  1772. }
  1773. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name, estimatedSize)
  1774. if err != nil {
  1775. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1776. return fmt.Errorf("unable to create archive: %w", err)
  1777. }
  1778. defer cancelFn()
  1779. baseDir := getArchiveBaseDir(paths)
  1780. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1781. zipWriter := &zipWriterWrapper{
  1782. Name: name,
  1783. Writer: zip.NewWriter(writer),
  1784. Entries: make(map[string]bool),
  1785. }
  1786. startTime := time.Now()
  1787. for _, item := range paths {
  1788. if err := addZipEntry(zipWriter, conn, item, baseDir); err != nil {
  1789. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1790. return err
  1791. }
  1792. }
  1793. if err := zipWriter.Writer.Close(); err != nil {
  1794. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1795. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1796. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1797. }
  1798. return closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime)
  1799. }
  1800. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1801. params *EventParams,
  1802. ) error {
  1803. users, err := params.getUsers()
  1804. if err != nil {
  1805. return fmt.Errorf("unable to get users: %w", err)
  1806. }
  1807. var failures []string
  1808. executed := 0
  1809. for _, user := range users {
  1810. // if sender is set, the conditions have already been evaluated
  1811. if params.sender == "" {
  1812. if !checkUserConditionOptions(&user, &conditions) {
  1813. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1814. user.Username)
  1815. continue
  1816. }
  1817. }
  1818. executed++
  1819. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1820. failures = append(failures, user.Username)
  1821. params.AddError(err)
  1822. }
  1823. }
  1824. if len(failures) > 0 {
  1825. return fmt.Errorf("fs existence check failed for users: %s", strings.Join(failures, ", "))
  1826. }
  1827. if executed == 0 {
  1828. eventManagerLog(logger.LevelError, "no existence check executed")
  1829. return errors.New("no existence check executed")
  1830. }
  1831. return nil
  1832. }
  1833. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1834. conditions dataprovider.ConditionOptions, params *EventParams,
  1835. ) error {
  1836. users, err := params.getUsers()
  1837. if err != nil {
  1838. return fmt.Errorf("unable to get users: %w", err)
  1839. }
  1840. var failures []string
  1841. executed := 0
  1842. for _, user := range users {
  1843. // if sender is set, the conditions have already been evaluated
  1844. if params.sender == "" {
  1845. if !checkUserConditionOptions(&user, &conditions) {
  1846. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1847. user.Username)
  1848. continue
  1849. }
  1850. }
  1851. executed++
  1852. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1853. failures = append(failures, user.Username)
  1854. params.AddError(err)
  1855. }
  1856. }
  1857. if len(failures) > 0 {
  1858. return fmt.Errorf("fs compress failed for users: %s", strings.Join(failures, ","))
  1859. }
  1860. if executed == 0 {
  1861. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1862. return errors.New("no file/folder compressed")
  1863. }
  1864. return nil
  1865. }
  1866. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1867. params *EventParams,
  1868. ) error {
  1869. addObjectData := false
  1870. replacements := params.getStringReplacements(addObjectData, false)
  1871. replacer := strings.NewReplacer(replacements...)
  1872. switch c.Type {
  1873. case dataprovider.FilesystemActionRename:
  1874. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1875. case dataprovider.FilesystemActionDelete:
  1876. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1877. case dataprovider.FilesystemActionMkdirs:
  1878. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1879. case dataprovider.FilesystemActionExist:
  1880. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1881. case dataprovider.FilesystemActionCompress:
  1882. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1883. case dataprovider.FilesystemActionCopy:
  1884. return executeCopyFsRuleAction(c.Copy, replacer, conditions, params)
  1885. default:
  1886. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1887. }
  1888. }
  1889. func executeQuotaResetForUser(user *dataprovider.User) error {
  1890. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1891. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1892. user.Username, err)
  1893. return err
  1894. }
  1895. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  1896. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1897. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1898. }
  1899. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1900. numFiles, size, err := user.ScanQuota()
  1901. if err != nil {
  1902. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1903. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1904. }
  1905. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  1906. if err != nil {
  1907. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1908. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1909. }
  1910. return nil
  1911. }
  1912. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1913. users, err := params.getUsers()
  1914. if err != nil {
  1915. return fmt.Errorf("unable to get users: %w", err)
  1916. }
  1917. var failures []string
  1918. executed := 0
  1919. for _, user := range users {
  1920. // if sender is set, the conditions have already been evaluated
  1921. if params.sender == "" {
  1922. if !checkUserConditionOptions(&user, &conditions) {
  1923. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  1924. user.Username)
  1925. continue
  1926. }
  1927. }
  1928. executed++
  1929. if err = executeQuotaResetForUser(&user); err != nil {
  1930. params.AddError(err)
  1931. failures = append(failures, user.Username)
  1932. }
  1933. }
  1934. if len(failures) > 0 {
  1935. return fmt.Errorf("quota reset failed for users: %s", strings.Join(failures, ", "))
  1936. }
  1937. if executed == 0 {
  1938. eventManagerLog(logger.LevelError, "no user quota reset executed")
  1939. return errors.New("no user quota reset executed")
  1940. }
  1941. return nil
  1942. }
  1943. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1944. folders, err := params.getFolders()
  1945. if err != nil {
  1946. return fmt.Errorf("unable to get folders: %w", err)
  1947. }
  1948. var failures []string
  1949. executed := 0
  1950. for _, folder := range folders {
  1951. // if sender is set, the conditions have already been evaluated
  1952. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  1953. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  1954. folder.Name)
  1955. continue
  1956. }
  1957. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  1958. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  1959. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  1960. failures = append(failures, folder.Name)
  1961. continue
  1962. }
  1963. executed++
  1964. f := vfs.VirtualFolder{
  1965. BaseVirtualFolder: folder,
  1966. VirtualPath: "/",
  1967. }
  1968. numFiles, size, err := f.ScanQuota()
  1969. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  1970. if err != nil {
  1971. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  1972. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  1973. failures = append(failures, folder.Name)
  1974. continue
  1975. }
  1976. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  1977. if err != nil {
  1978. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  1979. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  1980. failures = append(failures, folder.Name)
  1981. }
  1982. }
  1983. if len(failures) > 0 {
  1984. return fmt.Errorf("quota reset failed for folders: %s", strings.Join(failures, ", "))
  1985. }
  1986. if executed == 0 {
  1987. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  1988. return errors.New("no folder quota reset executed")
  1989. }
  1990. return nil
  1991. }
  1992. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1993. users, err := params.getUsers()
  1994. if err != nil {
  1995. return fmt.Errorf("unable to get users: %w", err)
  1996. }
  1997. var failures []string
  1998. executed := 0
  1999. for _, user := range users {
  2000. // if sender is set, the conditions have already been evaluated
  2001. if params.sender == "" {
  2002. if !checkUserConditionOptions(&user, &conditions) {
  2003. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  2004. user.Username)
  2005. continue
  2006. }
  2007. }
  2008. executed++
  2009. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  2010. if err != nil {
  2011. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  2012. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  2013. failures = append(failures, user.Username)
  2014. }
  2015. }
  2016. if len(failures) > 0 {
  2017. return fmt.Errorf("transfer quota reset failed for users: %s", strings.Join(failures, ", "))
  2018. }
  2019. if executed == 0 {
  2020. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  2021. return errors.New("no transfer quota reset executed")
  2022. }
  2023. return nil
  2024. }
  2025. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  2026. params *EventParams, actionName string,
  2027. ) error {
  2028. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2029. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  2030. user.Username, err)
  2031. return err
  2032. }
  2033. check := RetentionCheck{
  2034. Folders: folders,
  2035. }
  2036. c := RetentionChecks.Add(check, &user)
  2037. if c == nil {
  2038. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  2039. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  2040. }
  2041. defer func() {
  2042. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  2043. Username: user.Username,
  2044. ActionName: actionName,
  2045. Results: c.results,
  2046. })
  2047. }()
  2048. if err := c.Start(); err != nil {
  2049. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  2050. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  2051. }
  2052. return nil
  2053. }
  2054. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  2055. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  2056. ) error {
  2057. users, err := params.getUsers()
  2058. if err != nil {
  2059. return fmt.Errorf("unable to get users: %w", err)
  2060. }
  2061. var failures []string
  2062. executed := 0
  2063. for _, user := range users {
  2064. // if sender is set, the conditions have already been evaluated
  2065. if params.sender == "" {
  2066. if !checkUserConditionOptions(&user, &conditions) {
  2067. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  2068. user.Username)
  2069. continue
  2070. }
  2071. }
  2072. executed++
  2073. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  2074. failures = append(failures, user.Username)
  2075. params.AddError(err)
  2076. }
  2077. }
  2078. if len(failures) > 0 {
  2079. return fmt.Errorf("retention check failed for users: %s", strings.Join(failures, ", "))
  2080. }
  2081. if executed == 0 {
  2082. eventManagerLog(logger.LevelError, "no retention check executed")
  2083. return errors.New("no retention check executed")
  2084. }
  2085. return nil
  2086. }
  2087. func executeUserExpirationCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2088. users, err := params.getUsers()
  2089. if err != nil {
  2090. return fmt.Errorf("unable to get users: %w", err)
  2091. }
  2092. var failures []string
  2093. var executed int
  2094. for _, user := range users {
  2095. // if sender is set, the conditions have already been evaluated
  2096. if params.sender == "" {
  2097. if !checkUserConditionOptions(&user, &conditions) {
  2098. eventManagerLog(logger.LevelDebug, "skipping expiration check for user %q, condition options don't match",
  2099. user.Username)
  2100. continue
  2101. }
  2102. }
  2103. executed++
  2104. if user.ExpirationDate > 0 {
  2105. expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate)
  2106. if expDate.Before(time.Now()) {
  2107. failures = append(failures, user.Username)
  2108. }
  2109. }
  2110. }
  2111. if len(failures) > 0 {
  2112. return fmt.Errorf("expired users: %s", strings.Join(failures, ", "))
  2113. }
  2114. if executed == 0 {
  2115. eventManagerLog(logger.LevelError, "no user expiration check executed")
  2116. return errors.New("no user expiration check executed")
  2117. }
  2118. return nil
  2119. }
  2120. func executeMetadataCheckForUser(user *dataprovider.User) error {
  2121. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2122. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  2123. user.Username, err)
  2124. return err
  2125. }
  2126. if !ActiveMetadataChecks.Add(user.Username, user.Role) {
  2127. eventManagerLog(logger.LevelError, "another metadata check is already in progress for user %q", user.Username)
  2128. return fmt.Errorf("another metadata check is in progress for user %q", user.Username)
  2129. }
  2130. defer ActiveMetadataChecks.Remove(user.Username)
  2131. if err := user.CheckMetadataConsistency(); err != nil {
  2132. eventManagerLog(logger.LevelError, "error checking metadata consistence for user %q: %v", user.Username, err)
  2133. return fmt.Errorf("error checking metadata consistence for user %q: %w", user.Username, err)
  2134. }
  2135. return nil
  2136. }
  2137. func executeMetadataCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2138. users, err := params.getUsers()
  2139. if err != nil {
  2140. return fmt.Errorf("unable to get users: %w", err)
  2141. }
  2142. var failures []string
  2143. var executed int
  2144. for _, user := range users {
  2145. // if sender is set, the conditions have already been evaluated
  2146. if params.sender == "" {
  2147. if !checkUserConditionOptions(&user, &conditions) {
  2148. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, condition options don't match",
  2149. user.Username)
  2150. continue
  2151. }
  2152. }
  2153. executed++
  2154. if err = executeMetadataCheckForUser(&user); err != nil {
  2155. params.AddError(err)
  2156. failures = append(failures, user.Username)
  2157. }
  2158. }
  2159. if len(failures) > 0 {
  2160. return fmt.Errorf("metadata check failed for users: %s", strings.Join(failures, ", "))
  2161. }
  2162. if executed == 0 {
  2163. eventManagerLog(logger.LevelError, "no metadata check executed")
  2164. return errors.New("no metadata check executed")
  2165. }
  2166. return nil
  2167. }
  2168. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  2169. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2170. eventManagerLog(logger.LevelError, "skipping password expiration check for user %q, cannot apply group settings: %v",
  2171. user.Username, err)
  2172. return err
  2173. }
  2174. if user.ExpirationDate > 0 {
  2175. if expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate); expDate.Before(time.Now()) {
  2176. eventManagerLog(logger.LevelDebug, "skipping password expiration check for expired user %q, expiration date: %s",
  2177. user.Username, expDate)
  2178. return nil
  2179. }
  2180. }
  2181. if user.Filters.PasswordExpiration == 0 {
  2182. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  2183. return nil
  2184. }
  2185. days := user.PasswordExpiresIn()
  2186. if days > config.Threshold {
  2187. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  2188. user.Username, days, config.Threshold)
  2189. return nil
  2190. }
  2191. body := new(bytes.Buffer)
  2192. data := make(map[string]any)
  2193. data["Username"] = user.Username
  2194. data["Days"] = days
  2195. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  2196. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  2197. user.Username, err)
  2198. return err
  2199. }
  2200. subject := "SFTPGo password expiration notification"
  2201. startTime := time.Now()
  2202. if err := smtp.SendEmail([]string{user.Email}, nil, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  2203. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  2204. user.Username, err, time.Since(startTime))
  2205. return err
  2206. }
  2207. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  2208. user.Username, days, time.Since(startTime))
  2209. return nil
  2210. }
  2211. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  2212. params *EventParams) error {
  2213. users, err := params.getUsers()
  2214. if err != nil {
  2215. return fmt.Errorf("unable to get users: %w", err)
  2216. }
  2217. var failures []string
  2218. for _, user := range users {
  2219. // if sender is set, the conditions have already been evaluated
  2220. if params.sender == "" {
  2221. if !checkUserConditionOptions(&user, &conditions) {
  2222. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  2223. user.Username)
  2224. continue
  2225. }
  2226. }
  2227. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  2228. params.AddError(err)
  2229. failures = append(failures, user.Username)
  2230. }
  2231. }
  2232. if len(failures) > 0 {
  2233. return fmt.Errorf("password expiration check failed for users: %s", strings.Join(failures, ", "))
  2234. }
  2235. return nil
  2236. }
  2237. func executeAdminCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.Admin, error) {
  2238. admin, err := dataprovider.AdminExists(params.Name)
  2239. exists := err == nil
  2240. if exists && c.Mode == 1 {
  2241. return &admin, nil
  2242. }
  2243. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2244. return nil, err
  2245. }
  2246. replacements := params.getStringReplacements(false, true)
  2247. replacer := strings.NewReplacer(replacements...)
  2248. data := replaceWithReplacer(c.TemplateAdmin, replacer)
  2249. var newAdmin dataprovider.Admin
  2250. err = json.Unmarshal([]byte(data), &newAdmin)
  2251. if err != nil {
  2252. return nil, err
  2253. }
  2254. if newAdmin.Password == "" {
  2255. newAdmin.Password = util.GenerateUniqueID()
  2256. }
  2257. if exists {
  2258. eventManagerLog(logger.LevelDebug, "updating admin %q after IDP login", params.Name)
  2259. err = dataprovider.UpdateAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2260. } else {
  2261. eventManagerLog(logger.LevelDebug, "creating admin %q after IDP login", params.Name)
  2262. err = dataprovider.AddAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2263. }
  2264. return &newAdmin, err
  2265. }
  2266. func executeUserCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.User, error) {
  2267. user, err := dataprovider.UserExists(params.Name, "")
  2268. exists := err == nil
  2269. if exists && c.Mode == 1 {
  2270. err = user.LoadAndApplyGroupSettings()
  2271. return &user, err
  2272. }
  2273. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2274. return nil, err
  2275. }
  2276. replacements := params.getStringReplacements(false, true)
  2277. replacer := strings.NewReplacer(replacements...)
  2278. data := replaceWithReplacer(c.TemplateUser, replacer)
  2279. var newUser dataprovider.User
  2280. err = json.Unmarshal([]byte(data), &newUser)
  2281. if err != nil {
  2282. return nil, err
  2283. }
  2284. if exists {
  2285. eventManagerLog(logger.LevelDebug, "updating user %q after IDP login", params.Name)
  2286. err = dataprovider.UpdateUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2287. } else {
  2288. eventManagerLog(logger.LevelDebug, "creating user %q after IDP login", params.Name)
  2289. err = dataprovider.AddUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2290. }
  2291. if err != nil {
  2292. return nil, err
  2293. }
  2294. u, err := dataprovider.GetUserWithGroupSettings(params.Name, "")
  2295. return &u, err
  2296. }
  2297. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams,
  2298. conditions dataprovider.ConditionOptions,
  2299. ) error {
  2300. var err error
  2301. switch action.Type {
  2302. case dataprovider.ActionTypeHTTP:
  2303. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  2304. case dataprovider.ActionTypeCommand:
  2305. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  2306. case dataprovider.ActionTypeEmail:
  2307. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  2308. case dataprovider.ActionTypeBackup:
  2309. var backupPath string
  2310. backupPath, err = dataprovider.ExecuteBackup()
  2311. if err == nil {
  2312. params.setBackupParams(backupPath)
  2313. }
  2314. case dataprovider.ActionTypeUserQuotaReset:
  2315. err = executeUsersQuotaResetRuleAction(conditions, params)
  2316. case dataprovider.ActionTypeFolderQuotaReset:
  2317. err = executeFoldersQuotaResetRuleAction(conditions, params)
  2318. case dataprovider.ActionTypeTransferQuotaReset:
  2319. err = executeTransferQuotaResetRuleAction(conditions, params)
  2320. case dataprovider.ActionTypeDataRetentionCheck:
  2321. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  2322. case dataprovider.ActionTypeMetadataCheck:
  2323. err = executeMetadataCheckRuleAction(conditions, params)
  2324. case dataprovider.ActionTypeFilesystem:
  2325. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  2326. case dataprovider.ActionTypePasswordExpirationCheck:
  2327. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  2328. case dataprovider.ActionTypeUserExpirationCheck:
  2329. err = executeUserExpirationCheckRuleAction(conditions, params)
  2330. default:
  2331. err = fmt.Errorf("unsupported action type: %d", action.Type)
  2332. }
  2333. if err != nil {
  2334. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  2335. }
  2336. params.AddError(err)
  2337. return err
  2338. }
  2339. func executeIDPAccountCheckRule(rule dataprovider.EventRule, params EventParams) (*dataprovider.User,
  2340. *dataprovider.Admin, error,
  2341. ) {
  2342. for _, action := range rule.Actions {
  2343. if action.Type == dataprovider.ActionTypeIDPAccountCheck {
  2344. startTime := time.Now()
  2345. var user *dataprovider.User
  2346. var admin *dataprovider.Admin
  2347. var err error
  2348. var failedActions []string
  2349. paramsCopy := params.getACopy()
  2350. switch params.Event {
  2351. case IDPLoginAdmin:
  2352. admin, err = executeAdminCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2353. case IDPLoginUser:
  2354. user, err = executeUserCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2355. default:
  2356. err = fmt.Errorf("unsupported IDP login event: %q", params.Event)
  2357. }
  2358. if err != nil {
  2359. paramsCopy.AddError(fmt.Errorf("unable to handle %q: %w", params.Event, err))
  2360. eventManagerLog(logger.LevelError, "unable to handle IDP login event %q, err: %v", params.Event, err)
  2361. failedActions = append(failedActions, action.Name)
  2362. } else {
  2363. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2364. action.Name, rule.Name, time.Since(startTime))
  2365. }
  2366. // execute async actions if any, including failure actions
  2367. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2368. return user, admin, err
  2369. }
  2370. }
  2371. eventManagerLog(logger.LevelError, "no action executed for IDP login event %q, event rule: %q", params.Event, rule.Name)
  2372. return nil, nil, errors.New("no action executed")
  2373. }
  2374. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  2375. var errRes error
  2376. for _, rule := range rules {
  2377. var failedActions []string
  2378. paramsCopy := params.getACopy()
  2379. for _, action := range rule.Actions {
  2380. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  2381. startTime := time.Now()
  2382. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  2383. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  2384. action.Name, rule.Name, time.Since(startTime), err)
  2385. failedActions = append(failedActions, action.Name)
  2386. // we return the last error, it is ok for now
  2387. errRes = err
  2388. if action.Options.StopOnFailure {
  2389. break
  2390. }
  2391. } else {
  2392. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  2393. action.Name, rule.Name, time.Since(startTime))
  2394. }
  2395. }
  2396. }
  2397. // execute async actions if any, including failure actions
  2398. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2399. }
  2400. return errRes
  2401. }
  2402. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  2403. eventManager.addAsyncTask()
  2404. defer eventManager.removeAsyncTask()
  2405. for _, rule := range rules {
  2406. executeRuleAsyncActions(rule, params.getACopy(), nil)
  2407. }
  2408. }
  2409. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  2410. for _, action := range rule.Actions {
  2411. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  2412. startTime := time.Now()
  2413. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2414. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  2415. action.Name, rule.Name, time.Since(startTime), err)
  2416. failedActions = append(failedActions, action.Name)
  2417. if action.Options.StopOnFailure {
  2418. break
  2419. }
  2420. } else {
  2421. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2422. action.Name, rule.Name, time.Since(startTime))
  2423. }
  2424. }
  2425. }
  2426. if len(failedActions) > 0 {
  2427. params.updateStatusFromError = false
  2428. // execute failure actions
  2429. for _, action := range rule.Actions {
  2430. if action.Options.IsFailureAction {
  2431. startTime := time.Now()
  2432. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2433. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  2434. action.Name, rule.Name, time.Since(startTime), err)
  2435. if action.Options.StopOnFailure {
  2436. break
  2437. }
  2438. } else {
  2439. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  2440. action.Name, rule.Name, time.Since(startTime))
  2441. }
  2442. }
  2443. }
  2444. }
  2445. }
  2446. type eventCronJob struct {
  2447. ruleName string
  2448. }
  2449. func (j *eventCronJob) getTask(rule *dataprovider.EventRule) (dataprovider.Task, error) {
  2450. if rule.GuardFromConcurrentExecution() {
  2451. task, err := dataprovider.GetTaskByName(rule.Name)
  2452. if err != nil {
  2453. if errors.Is(err, util.ErrNotFound) {
  2454. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  2455. task = dataprovider.Task{
  2456. Name: rule.Name,
  2457. UpdateAt: 0,
  2458. Version: 0,
  2459. }
  2460. err = dataprovider.AddTask(rule.Name)
  2461. if err != nil {
  2462. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  2463. return task, err
  2464. }
  2465. } else {
  2466. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  2467. }
  2468. }
  2469. return task, err
  2470. }
  2471. return dataprovider.Task{}, nil
  2472. }
  2473. func (j *eventCronJob) Run() {
  2474. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  2475. rule, err := dataprovider.EventRuleExists(j.ruleName)
  2476. if err != nil {
  2477. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  2478. return
  2479. }
  2480. if err := rule.CheckActionsConsistency(""); err != nil {
  2481. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  2482. return
  2483. }
  2484. task, err := j.getTask(&rule)
  2485. if err != nil {
  2486. return
  2487. }
  2488. if task.Name != "" {
  2489. updateInterval := 5 * time.Minute
  2490. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  2491. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  2492. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  2493. return
  2494. }
  2495. err = dataprovider.UpdateTask(rule.Name, task.Version)
  2496. if err != nil {
  2497. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  2498. rule.Name, err)
  2499. return
  2500. }
  2501. ticker := time.NewTicker(updateInterval)
  2502. done := make(chan bool)
  2503. defer func() {
  2504. done <- true
  2505. ticker.Stop()
  2506. }()
  2507. go func(taskName string) {
  2508. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2509. for {
  2510. select {
  2511. case <-done:
  2512. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2513. return
  2514. case <-ticker.C:
  2515. err := dataprovider.UpdateTaskTimestamp(taskName)
  2516. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2517. }
  2518. }
  2519. }(task.Name)
  2520. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2521. } else {
  2522. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2523. }
  2524. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2525. }
  2526. // RunOnDemandRule executes actions for a rule with on-demand trigger
  2527. func RunOnDemandRule(name string) error {
  2528. eventManagerLog(logger.LevelDebug, "executing on demand rule %q", name)
  2529. rule, err := dataprovider.EventRuleExists(name)
  2530. if err != nil {
  2531. eventManagerLog(logger.LevelDebug, "unable to load rule with name %q", name)
  2532. return util.NewRecordNotFoundError(fmt.Sprintf("rule %q does not exist", name))
  2533. }
  2534. if rule.Trigger != dataprovider.EventTriggerOnDemand {
  2535. eventManagerLog(logger.LevelDebug, "cannot run rule %q as on demand, trigger: %d", name, rule.Trigger)
  2536. return util.NewValidationError(fmt.Sprintf("rule %q is not defined as on-demand", name))
  2537. }
  2538. if rule.Status != 1 {
  2539. eventManagerLog(logger.LevelDebug, "on-demand rule %q is inactive", name)
  2540. return util.NewValidationError(fmt.Sprintf("rule %q is inactive", name))
  2541. }
  2542. if err := rule.CheckActionsConsistency(""); err != nil {
  2543. eventManagerLog(logger.LevelError, "on-demand rule %q has incompatible actions: %v", name, err)
  2544. return util.NewValidationError(fmt.Sprintf("rule %q has incosistent actions", name))
  2545. }
  2546. eventManagerLog(logger.LevelDebug, "on-demand rule %q started", name)
  2547. go executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2548. return nil
  2549. }
  2550. type zipWriterWrapper struct {
  2551. Name string
  2552. Entries map[string]bool
  2553. Writer *zip.Writer
  2554. }
  2555. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2556. logger.Log(level, "eventmanager", "", format, v...)
  2557. }