eventmanager.go 91 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "mime/multipart"
  25. "net/http"
  26. "net/textproto"
  27. "net/url"
  28. "os"
  29. "os/exec"
  30. "path"
  31. "path/filepath"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "time"
  37. "github.com/bmatcuk/doublestar/v4"
  38. "github.com/klauspost/compress/zip"
  39. "github.com/robfig/cron/v3"
  40. "github.com/rs/xid"
  41. "github.com/sftpgo/sdk"
  42. "github.com/wneessen/go-mail"
  43. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  44. "github.com/drakkan/sftpgo/v2/internal/logger"
  45. "github.com/drakkan/sftpgo/v2/internal/plugin"
  46. "github.com/drakkan/sftpgo/v2/internal/smtp"
  47. "github.com/drakkan/sftpgo/v2/internal/util"
  48. "github.com/drakkan/sftpgo/v2/internal/vfs"
  49. )
  50. const (
  51. ipBlockedEventName = "IP Blocked"
  52. maxAttachmentsSize = int64(10 * 1024 * 1024)
  53. objDataPlaceholder = "{{ObjectData}}"
  54. objDataPlaceholderString = "{{ObjectDataString}}"
  55. )
  56. // Supported IDP login events
  57. const (
  58. IDPLoginUser = "IDP login user"
  59. IDPLoginAdmin = "IDP login admin"
  60. )
  61. var (
  62. // eventManager handle the supported event rules actions
  63. eventManager eventRulesContainer
  64. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  65. )
  66. func init() {
  67. eventManager = eventRulesContainer{
  68. schedulesMapping: make(map[string][]cron.EntryID),
  69. // arbitrary maximum number of concurrent asynchronous tasks,
  70. // each task could execute multiple actions
  71. concurrencyGuard: make(chan struct{}, 200),
  72. }
  73. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  74. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  75. p := EventParams{
  76. Name: executor,
  77. ObjectName: objectName,
  78. Event: operation,
  79. Status: 1,
  80. ObjectType: objectType,
  81. IP: ip,
  82. Role: role,
  83. Timestamp: time.Now().UnixNano(),
  84. Object: object,
  85. }
  86. if u, ok := object.(*dataprovider.User); ok {
  87. p.Email = u.Email
  88. } else if a, ok := object.(*dataprovider.Admin); ok {
  89. p.Email = a.Email
  90. }
  91. eventManager.handleProviderEvent(p)
  92. })
  93. }
  94. // HandleCertificateEvent checks and executes action rules for certificate events
  95. func HandleCertificateEvent(params EventParams) {
  96. eventManager.handleCertificateEvent(params)
  97. }
  98. // HandleIDPLoginEvent executes actions defined for a successful login from an Identity Provider
  99. func HandleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User, *dataprovider.Admin, error) {
  100. return eventManager.handleIDPLoginEvent(params, customFields)
  101. }
  102. // eventRulesContainer stores event rules by trigger
  103. type eventRulesContainer struct {
  104. sync.RWMutex
  105. lastLoad atomic.Int64
  106. FsEvents []dataprovider.EventRule
  107. ProviderEvents []dataprovider.EventRule
  108. Schedules []dataprovider.EventRule
  109. IPBlockedEvents []dataprovider.EventRule
  110. CertificateEvents []dataprovider.EventRule
  111. IPDLoginEvents []dataprovider.EventRule
  112. schedulesMapping map[string][]cron.EntryID
  113. concurrencyGuard chan struct{}
  114. }
  115. func (r *eventRulesContainer) addAsyncTask() {
  116. activeHooks.Add(1)
  117. r.concurrencyGuard <- struct{}{}
  118. }
  119. func (r *eventRulesContainer) removeAsyncTask() {
  120. activeHooks.Add(-1)
  121. <-r.concurrencyGuard
  122. }
  123. func (r *eventRulesContainer) getLastLoadTime() int64 {
  124. return r.lastLoad.Load()
  125. }
  126. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  127. r.lastLoad.Store(modTime)
  128. }
  129. // RemoveRule deletes the rule with the specified name
  130. func (r *eventRulesContainer) RemoveRule(name string) {
  131. r.Lock()
  132. defer r.Unlock()
  133. r.removeRuleInternal(name)
  134. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  135. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  136. }
  137. func (r *eventRulesContainer) removeRuleInternal(name string) {
  138. for idx := range r.FsEvents {
  139. if r.FsEvents[idx].Name == name {
  140. lastIdx := len(r.FsEvents) - 1
  141. r.FsEvents[idx] = r.FsEvents[lastIdx]
  142. r.FsEvents = r.FsEvents[:lastIdx]
  143. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  144. return
  145. }
  146. }
  147. for idx := range r.ProviderEvents {
  148. if r.ProviderEvents[idx].Name == name {
  149. lastIdx := len(r.ProviderEvents) - 1
  150. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  151. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  152. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  153. return
  154. }
  155. }
  156. for idx := range r.IPBlockedEvents {
  157. if r.IPBlockedEvents[idx].Name == name {
  158. lastIdx := len(r.IPBlockedEvents) - 1
  159. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  160. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  161. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  162. return
  163. }
  164. }
  165. for idx := range r.CertificateEvents {
  166. if r.CertificateEvents[idx].Name == name {
  167. lastIdx := len(r.CertificateEvents) - 1
  168. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  169. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  170. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  171. return
  172. }
  173. }
  174. for idx := range r.IPDLoginEvents {
  175. if r.IPDLoginEvents[idx].Name == name {
  176. lastIdx := len(r.IPDLoginEvents) - 1
  177. r.IPDLoginEvents[idx] = r.IPDLoginEvents[lastIdx]
  178. r.IPDLoginEvents = r.IPDLoginEvents[:lastIdx]
  179. eventManagerLog(logger.LevelDebug, "removed rule %q from IDP login events", name)
  180. return
  181. }
  182. }
  183. for idx := range r.Schedules {
  184. if r.Schedules[idx].Name == name {
  185. if schedules, ok := r.schedulesMapping[name]; ok {
  186. for _, entryID := range schedules {
  187. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  188. eventScheduler.Remove(entryID)
  189. }
  190. delete(r.schedulesMapping, name)
  191. }
  192. lastIdx := len(r.Schedules) - 1
  193. r.Schedules[idx] = r.Schedules[lastIdx]
  194. r.Schedules = r.Schedules[:lastIdx]
  195. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  196. return
  197. }
  198. }
  199. }
  200. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  201. r.removeRuleInternal(rule.Name)
  202. if rule.DeletedAt > 0 {
  203. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  204. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  205. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  206. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  207. }
  208. return
  209. }
  210. if rule.Status != 1 || rule.Trigger == dataprovider.EventTriggerOnDemand {
  211. return
  212. }
  213. switch rule.Trigger {
  214. case dataprovider.EventTriggerFsEvent:
  215. r.FsEvents = append(r.FsEvents, rule)
  216. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  217. case dataprovider.EventTriggerProviderEvent:
  218. r.ProviderEvents = append(r.ProviderEvents, rule)
  219. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  220. case dataprovider.EventTriggerIPBlocked:
  221. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  222. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  223. case dataprovider.EventTriggerCertificate:
  224. r.CertificateEvents = append(r.CertificateEvents, rule)
  225. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  226. case dataprovider.EventTriggerIDPLogin:
  227. r.IPDLoginEvents = append(r.IPDLoginEvents, rule)
  228. eventManagerLog(logger.LevelDebug, "added rule %q to IDP login events", rule.Name)
  229. case dataprovider.EventTriggerSchedule:
  230. for _, schedule := range rule.Conditions.Schedules {
  231. cronSpec := schedule.GetCronSpec()
  232. job := &eventCronJob{
  233. ruleName: dataprovider.ConvertName(rule.Name),
  234. }
  235. entryID, err := eventScheduler.AddJob(cronSpec, job)
  236. if err != nil {
  237. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  238. return
  239. }
  240. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  241. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  242. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  243. }
  244. r.Schedules = append(r.Schedules, rule)
  245. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  246. default:
  247. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  248. }
  249. }
  250. func (r *eventRulesContainer) loadRules() {
  251. eventManagerLog(logger.LevelDebug, "loading updated rules")
  252. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  253. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  254. if err != nil {
  255. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  256. return
  257. }
  258. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  259. if len(rules) > 0 {
  260. r.Lock()
  261. defer r.Unlock()
  262. for _, rule := range rules {
  263. r.addUpdateRuleInternal(rule)
  264. }
  265. }
  266. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d, IDP login events: %d",
  267. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents), len(r.IPDLoginEvents))
  268. r.setLastLoadTime(modTime)
  269. }
  270. func (*eventRulesContainer) checkIPDLoginEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  271. switch conditions.IDPLoginEvent {
  272. case dataprovider.IDPLoginUser:
  273. if params.Event != IDPLoginUser {
  274. return false
  275. }
  276. case dataprovider.IDPLoginAdmin:
  277. if params.Event != IDPLoginAdmin {
  278. return false
  279. }
  280. }
  281. return checkEventConditionPatterns(params.Name, conditions.Options.Names)
  282. }
  283. func (*eventRulesContainer) checkProviderEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  284. if !util.Contains(conditions.ProviderEvents, params.Event) {
  285. return false
  286. }
  287. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  288. return false
  289. }
  290. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  291. return false
  292. }
  293. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  294. return false
  295. }
  296. return true
  297. }
  298. func (*eventRulesContainer) checkFsEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  299. if !util.Contains(conditions.FsEvents, params.Event) {
  300. return false
  301. }
  302. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  303. return false
  304. }
  305. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  306. return false
  307. }
  308. if !checkEventGroupConditionPatterns(params.Groups, conditions.Options.GroupNames) {
  309. return false
  310. }
  311. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  312. return false
  313. }
  314. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  315. return false
  316. }
  317. if params.Event == operationUpload || params.Event == operationDownload {
  318. if conditions.Options.MinFileSize > 0 {
  319. if params.FileSize < conditions.Options.MinFileSize {
  320. return false
  321. }
  322. }
  323. if conditions.Options.MaxFileSize > 0 {
  324. if params.FileSize > conditions.Options.MaxFileSize {
  325. return false
  326. }
  327. }
  328. }
  329. return true
  330. }
  331. // hasFsRules returns true if there are any rules for filesystem event triggers
  332. func (r *eventRulesContainer) hasFsRules() bool {
  333. r.RLock()
  334. defer r.RUnlock()
  335. return len(r.FsEvents) > 0
  336. }
  337. // handleFsEvent executes the rules actions defined for the specified event.
  338. // The boolean parameter indicates whether a sync action was executed
  339. func (r *eventRulesContainer) handleFsEvent(params EventParams) (bool, error) {
  340. if params.Protocol == protocolEventAction {
  341. return false, nil
  342. }
  343. r.RLock()
  344. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  345. for _, rule := range r.FsEvents {
  346. if r.checkFsEventMatch(&rule.Conditions, &params) {
  347. if err := rule.CheckActionsConsistency(""); err != nil {
  348. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  349. rule.Name, err, params.Event)
  350. continue
  351. }
  352. hasSyncActions := false
  353. for _, action := range rule.Actions {
  354. if action.Options.ExecuteSync {
  355. hasSyncActions = true
  356. break
  357. }
  358. }
  359. if hasSyncActions {
  360. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  361. } else {
  362. rulesAsync = append(rulesAsync, rule)
  363. }
  364. }
  365. }
  366. r.RUnlock()
  367. params.sender = params.Name
  368. params.addUID()
  369. if len(rulesAsync) > 0 {
  370. go executeAsyncRulesActions(rulesAsync, params)
  371. }
  372. if len(rulesWithSyncActions) > 0 {
  373. return true, executeSyncRulesActions(rulesWithSyncActions, params)
  374. }
  375. return false, nil
  376. }
  377. func (r *eventRulesContainer) handleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User,
  378. *dataprovider.Admin, error,
  379. ) {
  380. r.RLock()
  381. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  382. for _, rule := range r.IPDLoginEvents {
  383. if r.checkIPDLoginEventMatch(&rule.Conditions, &params) {
  384. if err := rule.CheckActionsConsistency(""); err != nil {
  385. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  386. rule.Name, err, params.Event)
  387. continue
  388. }
  389. hasSyncActions := false
  390. for _, action := range rule.Actions {
  391. if action.Options.ExecuteSync {
  392. hasSyncActions = true
  393. break
  394. }
  395. }
  396. if hasSyncActions {
  397. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  398. } else {
  399. rulesAsync = append(rulesAsync, rule)
  400. }
  401. }
  402. }
  403. r.RUnlock()
  404. if len(rulesAsync) == 0 && len(rulesWithSyncActions) == 0 {
  405. return nil, nil, nil
  406. }
  407. params.addIDPCustomFields(customFields)
  408. if len(rulesWithSyncActions) > 1 {
  409. var ruleNames []string
  410. for _, r := range rulesWithSyncActions {
  411. ruleNames = append(ruleNames, r.Name)
  412. }
  413. return nil, nil, fmt.Errorf("more than one account check action rules matches: %q", strings.Join(ruleNames, ","))
  414. }
  415. params.addUID()
  416. if len(rulesAsync) > 0 {
  417. go executeAsyncRulesActions(rulesAsync, params)
  418. }
  419. if len(rulesWithSyncActions) > 0 {
  420. return executeIDPAccountCheckRule(rulesWithSyncActions[0], params)
  421. }
  422. return nil, nil, nil
  423. }
  424. // username is populated for user objects
  425. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  426. r.RLock()
  427. defer r.RUnlock()
  428. var rules []dataprovider.EventRule
  429. for _, rule := range r.ProviderEvents {
  430. if r.checkProviderEventMatch(&rule.Conditions, &params) {
  431. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  432. rules = append(rules, rule)
  433. } else {
  434. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  435. rule.Name, err, params.Event, params.ObjectType)
  436. }
  437. }
  438. }
  439. if len(rules) > 0 {
  440. params.sender = params.ObjectName
  441. go executeAsyncRulesActions(rules, params)
  442. }
  443. }
  444. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  445. r.RLock()
  446. defer r.RUnlock()
  447. if len(r.IPBlockedEvents) == 0 {
  448. return
  449. }
  450. var rules []dataprovider.EventRule
  451. for _, rule := range r.IPBlockedEvents {
  452. if err := rule.CheckActionsConsistency(""); err == nil {
  453. rules = append(rules, rule)
  454. } else {
  455. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  456. rule.Name, err, params.Event)
  457. }
  458. }
  459. if len(rules) > 0 {
  460. go executeAsyncRulesActions(rules, params)
  461. }
  462. }
  463. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  464. r.RLock()
  465. defer r.RUnlock()
  466. if len(r.CertificateEvents) == 0 {
  467. return
  468. }
  469. var rules []dataprovider.EventRule
  470. for _, rule := range r.CertificateEvents {
  471. if err := rule.CheckActionsConsistency(""); err == nil {
  472. rules = append(rules, rule)
  473. } else {
  474. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  475. rule.Name, err, params.Event)
  476. }
  477. }
  478. if len(rules) > 0 {
  479. go executeAsyncRulesActions(rules, params)
  480. }
  481. }
  482. type executedRetentionCheck struct {
  483. Username string
  484. ActionName string
  485. Results []folderRetentionCheckResult
  486. }
  487. // EventParams defines the supported event parameters
  488. type EventParams struct {
  489. Name string
  490. Groups []sdk.GroupMapping
  491. Event string
  492. Status int
  493. VirtualPath string
  494. FsPath string
  495. VirtualTargetPath string
  496. FsTargetPath string
  497. ObjectName string
  498. Extension string
  499. ObjectType string
  500. FileSize int64
  501. Elapsed int64
  502. Protocol string
  503. IP string
  504. Role string
  505. Email string
  506. Timestamp int64
  507. UID string
  508. IDPCustomFields *map[string]string
  509. Object plugin.Renderer
  510. Metadata map[string]string
  511. sender string
  512. updateStatusFromError bool
  513. errors []string
  514. retentionChecks []executedRetentionCheck
  515. }
  516. func (p *EventParams) getACopy() *EventParams {
  517. params := *p
  518. params.errors = make([]string, len(p.errors))
  519. copy(params.errors, p.errors)
  520. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  521. for _, c := range p.retentionChecks {
  522. executedCheck := executedRetentionCheck{
  523. Username: c.Username,
  524. ActionName: c.ActionName,
  525. }
  526. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  527. copy(executedCheck.Results, c.Results)
  528. retentionChecks = append(retentionChecks, executedCheck)
  529. }
  530. params.retentionChecks = retentionChecks
  531. if p.IDPCustomFields != nil {
  532. fields := make(map[string]string)
  533. for k, v := range *p.IDPCustomFields {
  534. fields[k] = v
  535. }
  536. params.IDPCustomFields = &fields
  537. }
  538. if len(params.Metadata) > 0 {
  539. metadata := make(map[string]string)
  540. for k, v := range p.Metadata {
  541. metadata[k] = v
  542. }
  543. params.Metadata = metadata
  544. }
  545. return &params
  546. }
  547. func (p *EventParams) addIDPCustomFields(customFields *map[string]any) {
  548. if customFields == nil || len(*customFields) == 0 {
  549. return
  550. }
  551. fields := make(map[string]string)
  552. for k, v := range *customFields {
  553. switch val := v.(type) {
  554. case string:
  555. fields[k] = val
  556. }
  557. }
  558. p.IDPCustomFields = &fields
  559. }
  560. // AddError adds a new error to the event params and update the status if needed
  561. func (p *EventParams) AddError(err error) {
  562. if err == nil {
  563. return
  564. }
  565. if p.updateStatusFromError && p.Status == 1 {
  566. p.Status = 2
  567. }
  568. p.errors = append(p.errors, err.Error())
  569. }
  570. func (p *EventParams) addUID() {
  571. if p.UID == "" {
  572. p.UID = util.GenerateUniqueID()
  573. }
  574. }
  575. func (p *EventParams) setBackupParams(backupPath string) {
  576. if p.sender != "" {
  577. return
  578. }
  579. p.sender = dataprovider.ActionExecutorSystem
  580. p.FsPath = backupPath
  581. p.ObjectName = filepath.Base(backupPath)
  582. p.VirtualPath = "/" + p.ObjectName
  583. p.Timestamp = time.Now().UnixNano()
  584. info, err := os.Stat(backupPath)
  585. if err == nil {
  586. p.FileSize = info.Size()
  587. }
  588. }
  589. func (p *EventParams) getStatusString() string {
  590. switch p.Status {
  591. case 1:
  592. return "OK"
  593. default:
  594. return "KO"
  595. }
  596. }
  597. // getUsers returns users with group settings not applied
  598. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  599. if p.sender == "" {
  600. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeUsers})
  601. if err != nil {
  602. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  603. return nil, errors.New("unable to get users")
  604. }
  605. return dump.Users, nil
  606. }
  607. user, err := p.getUserFromSender()
  608. if err != nil {
  609. return nil, err
  610. }
  611. return []dataprovider.User{user}, nil
  612. }
  613. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  614. if p.sender == dataprovider.ActionExecutorSystem {
  615. return dataprovider.User{
  616. BaseUser: sdk.BaseUser{
  617. Status: 1,
  618. Username: p.sender,
  619. HomeDir: dataprovider.GetBackupsPath(),
  620. Permissions: map[string][]string{
  621. "/": {dataprovider.PermAny},
  622. },
  623. },
  624. }, nil
  625. }
  626. user, err := dataprovider.UserExists(p.sender, "")
  627. if err != nil {
  628. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  629. return user, fmt.Errorf("error getting user %q", p.sender)
  630. }
  631. return user, nil
  632. }
  633. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  634. if p.sender == "" {
  635. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeFolders})
  636. return dump.Folders, err
  637. }
  638. folder, err := dataprovider.GetFolderByName(p.sender)
  639. if err != nil {
  640. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  641. }
  642. return []vfs.BaseVirtualFolder{folder}, nil
  643. }
  644. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  645. if len(p.retentionChecks) == 0 {
  646. return nil, errors.New("no data retention report available")
  647. }
  648. var b bytes.Buffer
  649. if _, err := p.writeCompressedDataRetentionReports(&b); err != nil {
  650. return nil, err
  651. }
  652. return b.Bytes(), nil
  653. }
  654. func (p *EventParams) writeCompressedDataRetentionReports(w io.Writer) (int64, error) {
  655. var n int64
  656. wr := zip.NewWriter(w)
  657. for _, check := range p.retentionChecks {
  658. data, err := getCSVRetentionReport(check.Results)
  659. if err != nil {
  660. return n, fmt.Errorf("unable to get CSV report: %w", err)
  661. }
  662. dataSize := int64(len(data))
  663. n += dataSize
  664. // we suppose a 3:1 compression ratio
  665. if n > (maxAttachmentsSize * 3) {
  666. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s",
  667. util.ByteCountIEC(n))
  668. return n, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(n))
  669. }
  670. fh := &zip.FileHeader{
  671. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  672. Method: zip.Deflate,
  673. Modified: time.Now().UTC(),
  674. }
  675. f, err := wr.CreateHeader(fh)
  676. if err != nil {
  677. return n, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  678. }
  679. _, err = io.CopyN(f, bytes.NewBuffer(data), dataSize)
  680. if err != nil {
  681. return n, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  682. }
  683. }
  684. if err := wr.Close(); err != nil {
  685. return n, fmt.Errorf("unable to close zip writer: %w", err)
  686. }
  687. return n, nil
  688. }
  689. func (p *EventParams) getRetentionReportsAsMailAttachment() (*mail.File, error) {
  690. if len(p.retentionChecks) == 0 {
  691. return nil, errors.New("no data retention report available")
  692. }
  693. return &mail.File{
  694. Name: "retention-reports.zip",
  695. Header: make(map[string][]string),
  696. Writer: p.writeCompressedDataRetentionReports,
  697. }, nil
  698. }
  699. func (*EventParams) getStringReplacement(val string, jsonEscaped bool) string {
  700. if jsonEscaped {
  701. return util.JSONEscape(val)
  702. }
  703. return val
  704. }
  705. func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []string {
  706. replacements := []string{
  707. "{{Name}}", p.getStringReplacement(p.Name, jsonEscaped),
  708. "{{Event}}", p.Event,
  709. "{{Status}}", fmt.Sprintf("%d", p.Status),
  710. "{{VirtualPath}}", p.getStringReplacement(p.VirtualPath, jsonEscaped),
  711. "{{FsPath}}", p.getStringReplacement(p.FsPath, jsonEscaped),
  712. "{{VirtualTargetPath}}", p.getStringReplacement(p.VirtualTargetPath, jsonEscaped),
  713. "{{FsTargetPath}}", p.getStringReplacement(p.FsTargetPath, jsonEscaped),
  714. "{{ObjectName}}", p.getStringReplacement(p.ObjectName, jsonEscaped),
  715. "{{ObjectType}}", p.ObjectType,
  716. "{{FileSize}}", strconv.FormatInt(p.FileSize, 10),
  717. "{{Elapsed}}", strconv.FormatInt(p.Elapsed, 10),
  718. "{{Protocol}}", p.Protocol,
  719. "{{IP}}", p.IP,
  720. "{{Role}}", p.getStringReplacement(p.Role, jsonEscaped),
  721. "{{Email}}", p.getStringReplacement(p.Email, jsonEscaped),
  722. "{{Timestamp}}", strconv.FormatInt(p.Timestamp, 10),
  723. "{{StatusString}}", p.getStatusString(),
  724. "{{UID}}", p.getStringReplacement(p.UID, jsonEscaped),
  725. "{{Ext}}", p.getStringReplacement(p.Extension, jsonEscaped),
  726. }
  727. if p.VirtualPath != "" {
  728. replacements = append(replacements, "{{VirtualDirPath}}", p.getStringReplacement(path.Dir(p.VirtualPath), jsonEscaped))
  729. }
  730. if p.VirtualTargetPath != "" {
  731. replacements = append(replacements, "{{VirtualTargetDirPath}}", p.getStringReplacement(path.Dir(p.VirtualTargetPath), jsonEscaped))
  732. replacements = append(replacements, "{{TargetName}}", p.getStringReplacement(path.Base(p.VirtualTargetPath), jsonEscaped))
  733. }
  734. if len(p.errors) > 0 {
  735. replacements = append(replacements, "{{ErrorString}}", p.getStringReplacement(strings.Join(p.errors, ", "), jsonEscaped))
  736. } else {
  737. replacements = append(replacements, "{{ErrorString}}", "")
  738. }
  739. replacements = append(replacements, objDataPlaceholder, "{}")
  740. replacements = append(replacements, objDataPlaceholderString, "")
  741. if addObjectData {
  742. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  743. if err == nil {
  744. dataString := string(data)
  745. replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
  746. replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
  747. }
  748. }
  749. if p.IDPCustomFields != nil {
  750. for k, v := range *p.IDPCustomFields {
  751. replacements = append(replacements, fmt.Sprintf("{{IDPField%s}}", k), p.getStringReplacement(v, jsonEscaped))
  752. }
  753. }
  754. replacements = append(replacements, "{{Metadata}}", "{}")
  755. replacements = append(replacements, "{{MetadataString}}", "")
  756. if len(p.Metadata) > 0 {
  757. data, err := json.Marshal(p.Metadata)
  758. if err == nil {
  759. dataString := string(data)
  760. replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
  761. replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
  762. }
  763. }
  764. return replacements
  765. }
  766. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  767. var b bytes.Buffer
  768. csvWriter := csv.NewWriter(&b)
  769. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  770. "elapsed (ms)", "info", "error"})
  771. if err != nil {
  772. return nil, err
  773. }
  774. for _, result := range results {
  775. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  776. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  777. result.Info, result.Error})
  778. if err != nil {
  779. return nil, err
  780. }
  781. }
  782. csvWriter.Flush()
  783. err = csvWriter.Error()
  784. return b.Bytes(), err
  785. }
  786. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSourcePath, virtualTargetPath string,
  787. numFiles int, truncatedSize int64, errTransfer error, operation string, startTime time.Time,
  788. ) error {
  789. var fsDstPath string
  790. var errDstFs error
  791. errWrite := w.Close()
  792. targetPath := virtualSourcePath
  793. if virtualTargetPath != "" {
  794. targetPath = virtualTargetPath
  795. var fsDst vfs.Fs
  796. fsDst, fsDstPath, errDstFs = conn.GetFsAndResolvedPath(virtualTargetPath)
  797. if errTransfer != nil && errDstFs == nil {
  798. // try to remove a partial file on error. If this fails, we can't do anything
  799. errRemove := fsDst.Remove(fsDstPath, false)
  800. conn.Log(logger.LevelDebug, "removing partial file %q after write error, result: %v", virtualTargetPath, errRemove)
  801. }
  802. }
  803. info, err := conn.doStatInternal(targetPath, 0, false, false)
  804. if err == nil {
  805. updateUserQuotaAfterFileWrite(conn, targetPath, numFiles, info.Size()-truncatedSize)
  806. var fsSrcPath string
  807. var errSrcFs error
  808. if virtualSourcePath != "" {
  809. _, fsSrcPath, errSrcFs = conn.GetFsAndResolvedPath(virtualSourcePath)
  810. }
  811. if errSrcFs == nil && errDstFs == nil {
  812. elapsed := time.Since(startTime).Nanoseconds() / 1000000
  813. if errTransfer == nil {
  814. errTransfer = errWrite
  815. }
  816. if operation == operationCopy {
  817. logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
  818. "", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed)
  819. }
  820. ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed, nil) //nolint:errcheck
  821. }
  822. } else {
  823. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
  824. }
  825. if errTransfer != nil {
  826. return errTransfer
  827. }
  828. return errWrite
  829. }
  830. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  831. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  832. if err != nil {
  833. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  834. return
  835. }
  836. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  837. if vfolder.IsIncludedInUserQuota() {
  838. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  839. }
  840. }
  841. func checkWriterPermsAndQuota(conn *BaseConnection, virtualPath string, numFiles int, expectedSize, truncatedSize int64) error {
  842. if numFiles == 0 {
  843. if !conn.User.HasPerm(dataprovider.PermOverwrite, path.Dir(virtualPath)) {
  844. return conn.GetPermissionDeniedError()
  845. }
  846. } else {
  847. if !conn.User.HasPerm(dataprovider.PermUpload, path.Dir(virtualPath)) {
  848. return conn.GetPermissionDeniedError()
  849. }
  850. }
  851. q, _ := conn.HasSpace(numFiles > 0, false, virtualPath)
  852. if !q.HasSpace {
  853. return conn.GetQuotaExceededError()
  854. }
  855. if expectedSize != -1 {
  856. sizeDiff := expectedSize - truncatedSize
  857. if sizeDiff > 0 {
  858. remainingSize := q.GetRemainingSize()
  859. if remainingSize > 0 && remainingSize < sizeDiff {
  860. return conn.GetQuotaExceededError()
  861. }
  862. }
  863. }
  864. return nil
  865. }
  866. func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64) (io.WriteCloser, int, int64, func(), error) {
  867. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  868. if err != nil {
  869. return nil, 0, 0, nil, err
  870. }
  871. var truncatedSize, fileSize int64
  872. numFiles := 1
  873. isFileOverwrite := false
  874. info, err := fs.Lstat(fsPath)
  875. if err == nil {
  876. fileSize = info.Size()
  877. if info.IsDir() {
  878. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  879. }
  880. if info.Mode().IsRegular() {
  881. isFileOverwrite = true
  882. truncatedSize = fileSize
  883. }
  884. numFiles = 0
  885. }
  886. if err != nil && !fs.IsNotExist(err) {
  887. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  888. }
  889. if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
  890. return nil, numFiles, truncatedSize, nil, err
  891. }
  892. f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1, false))
  893. if err != nil {
  894. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  895. }
  896. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  897. if isFileOverwrite {
  898. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  899. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  900. truncatedSize = 0
  901. }
  902. }
  903. if cancelFn == nil {
  904. cancelFn = func() {}
  905. }
  906. if f != nil {
  907. return f, numFiles, truncatedSize, cancelFn, nil
  908. }
  909. return w, numFiles, truncatedSize, cancelFn, nil
  910. }
  911. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string) error {
  912. if entryPath == wr.Name {
  913. // skip the archive itself
  914. return nil
  915. }
  916. info, err := conn.DoStat(entryPath, 1, false)
  917. if err != nil {
  918. eventManagerLog(logger.LevelError, "unable to add zip entry %q, stat error: %v", entryPath, err)
  919. return err
  920. }
  921. entryName, err := getZipEntryName(entryPath, baseDir)
  922. if err != nil {
  923. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  924. return err
  925. }
  926. if _, ok := wr.Entries[entryName]; ok {
  927. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  928. return nil
  929. }
  930. wr.Entries[entryName] = true
  931. if info.IsDir() {
  932. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  933. Name: entryName + "/",
  934. Method: zip.Deflate,
  935. Modified: info.ModTime(),
  936. })
  937. if err != nil {
  938. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  939. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  940. }
  941. contents, err := conn.ListDir(entryPath)
  942. if err != nil {
  943. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  944. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  945. }
  946. for _, info := range contents {
  947. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  948. if err := addZipEntry(wr, conn, fullPath, baseDir); err != nil {
  949. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  950. return err
  951. }
  952. }
  953. return nil
  954. }
  955. if !info.Mode().IsRegular() {
  956. // we only allow regular files
  957. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  958. return nil
  959. }
  960. reader, cancelFn, err := getFileReader(conn, entryPath)
  961. if err != nil {
  962. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  963. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  964. }
  965. defer cancelFn()
  966. defer reader.Close()
  967. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  968. Name: entryName,
  969. Method: zip.Deflate,
  970. Modified: info.ModTime(),
  971. })
  972. if err != nil {
  973. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  974. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  975. }
  976. _, err = io.Copy(f, reader)
  977. return err
  978. }
  979. func getZipEntryName(entryPath, baseDir string) (string, error) {
  980. if !strings.HasPrefix(entryPath, baseDir) {
  981. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  982. }
  983. entryPath = strings.TrimPrefix(entryPath, baseDir)
  984. return strings.TrimPrefix(entryPath, "/"), nil
  985. }
  986. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  987. if !conn.User.HasPerm(dataprovider.PermDownload, path.Dir(virtualPath)) {
  988. return nil, nil, conn.GetPermissionDeniedError()
  989. }
  990. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  991. if err != nil {
  992. return nil, nil, err
  993. }
  994. f, r, cancelFn, err := fs.Open(fsPath, 0)
  995. if err != nil {
  996. return nil, nil, conn.GetFsError(fs, err)
  997. }
  998. if cancelFn == nil {
  999. cancelFn = func() {}
  1000. }
  1001. if f != nil {
  1002. return f, cancelFn, nil
  1003. }
  1004. return r, cancelFn, nil
  1005. }
  1006. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  1007. reader, cancelFn, err := getFileReader(conn, virtualPath)
  1008. if err != nil {
  1009. return err
  1010. }
  1011. defer cancelFn()
  1012. defer reader.Close()
  1013. _, err = io.Copy(w, reader)
  1014. return err
  1015. }
  1016. func getFileContentFn(conn *BaseConnection, virtualPath string, size int64) func(w io.Writer) (int64, error) {
  1017. return func(w io.Writer) (int64, error) {
  1018. reader, cancelFn, err := getFileReader(conn, virtualPath)
  1019. if err != nil {
  1020. return 0, err
  1021. }
  1022. defer cancelFn()
  1023. defer reader.Close()
  1024. return io.CopyN(w, reader, size)
  1025. }
  1026. }
  1027. func getMailAttachments(conn *BaseConnection, attachments []string, replacer *strings.Replacer) ([]*mail.File, error) {
  1028. var files []*mail.File
  1029. totalSize := int64(0)
  1030. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  1031. info, err := conn.DoStat(virtualPath, 0, false)
  1032. if err != nil {
  1033. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  1034. }
  1035. if !info.Mode().IsRegular() {
  1036. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  1037. }
  1038. totalSize += info.Size()
  1039. if totalSize > maxAttachmentsSize {
  1040. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  1041. }
  1042. files = append(files, &mail.File{
  1043. Name: path.Base(virtualPath),
  1044. Header: make(map[string][]string),
  1045. Writer: getFileContentFn(conn, virtualPath, info.Size()),
  1046. })
  1047. }
  1048. return files, nil
  1049. }
  1050. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  1051. if !strings.Contains(input, "{{") {
  1052. return input
  1053. }
  1054. return replacer.Replace(input)
  1055. }
  1056. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  1057. var matched bool
  1058. var err error
  1059. if strings.Contains(p.Pattern, "**") {
  1060. matched, err = doublestar.Match(p.Pattern, name)
  1061. } else {
  1062. matched, err = path.Match(p.Pattern, name)
  1063. }
  1064. if err != nil {
  1065. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  1066. return false
  1067. }
  1068. if p.InverseMatch {
  1069. return !matched
  1070. }
  1071. return matched
  1072. }
  1073. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  1074. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1075. return false
  1076. }
  1077. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  1078. return false
  1079. }
  1080. if !checkEventGroupConditionPatterns(user.Groups, conditions.GroupNames) {
  1081. return false
  1082. }
  1083. return true
  1084. }
  1085. // checkConditionPatterns returns false if patterns are defined and no match is found
  1086. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  1087. if len(patterns) == 0 {
  1088. return true
  1089. }
  1090. matches := false
  1091. for _, p := range patterns {
  1092. // assume, that multiple InverseMatches are set
  1093. if p.InverseMatch {
  1094. if checkEventConditionPattern(p, name) {
  1095. matches = true
  1096. } else {
  1097. return false
  1098. }
  1099. } else if checkEventConditionPattern(p, name) {
  1100. return true
  1101. }
  1102. }
  1103. return matches
  1104. }
  1105. func checkEventGroupConditionPatterns(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  1106. if len(patterns) == 0 {
  1107. return true
  1108. }
  1109. matches := false
  1110. for _, group := range groups {
  1111. for _, p := range patterns {
  1112. // assume, that multiple InverseMatches are set
  1113. if p.InverseMatch {
  1114. if checkEventConditionPattern(p, group.Name) {
  1115. matches = true
  1116. } else {
  1117. return false
  1118. }
  1119. } else {
  1120. if checkEventConditionPattern(p, group.Name) {
  1121. return true
  1122. }
  1123. }
  1124. }
  1125. }
  1126. return matches
  1127. }
  1128. func getHTTPRuleActionEndpoint(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  1129. u, err := url.Parse(c.Endpoint)
  1130. if err != nil {
  1131. return "", fmt.Errorf("invalid endpoint: %w", err)
  1132. }
  1133. if strings.Contains(u.Path, "{{") {
  1134. pathComponents := strings.Split(u.Path, "/")
  1135. for idx := range pathComponents {
  1136. part := replaceWithReplacer(pathComponents[idx], replacer)
  1137. if part != pathComponents[idx] {
  1138. pathComponents[idx] = url.PathEscape(part)
  1139. }
  1140. }
  1141. u.Path = ""
  1142. u = u.JoinPath(pathComponents...)
  1143. }
  1144. if len(c.QueryParameters) > 0 {
  1145. q := u.Query()
  1146. for _, keyVal := range c.QueryParameters {
  1147. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1148. }
  1149. u.RawQuery = q.Encode()
  1150. }
  1151. return u.String(), nil
  1152. }
  1153. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  1154. conn *BaseConnection, replacer *strings.Replacer, params *EventParams, addObjectData bool,
  1155. ) error {
  1156. partWriter, err := m.CreatePart(h)
  1157. if err != nil {
  1158. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  1159. return err
  1160. }
  1161. if part.Body != "" {
  1162. cType := h.Get("Content-Type")
  1163. if strings.Contains(strings.ToLower(cType), "application/json") {
  1164. replacements := params.getStringReplacements(addObjectData, true)
  1165. jsonReplacer := strings.NewReplacer(replacements...)
  1166. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, jsonReplacer)))
  1167. } else {
  1168. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, replacer)))
  1169. }
  1170. if err != nil {
  1171. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1172. return err
  1173. }
  1174. return nil
  1175. }
  1176. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  1177. data, err := params.getCompressedDataRetentionReport()
  1178. if err != nil {
  1179. return err
  1180. }
  1181. _, err = partWriter.Write(data)
  1182. if err != nil {
  1183. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1184. return err
  1185. }
  1186. return nil
  1187. }
  1188. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  1189. if err != nil {
  1190. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  1191. return err
  1192. }
  1193. return nil
  1194. }
  1195. func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1196. cancel context.CancelFunc, user dataprovider.User, params *EventParams, addObjectData bool,
  1197. ) (io.Reader, string, error) {
  1198. var body io.Reader
  1199. if c.Method == http.MethodGet {
  1200. return body, "", nil
  1201. }
  1202. if c.Body != "" {
  1203. if c.Body == dataprovider.RetentionReportPlaceHolder {
  1204. data, err := params.getCompressedDataRetentionReport()
  1205. if err != nil {
  1206. return body, "", err
  1207. }
  1208. return bytes.NewBuffer(data), "", nil
  1209. }
  1210. if c.HasJSONBody() {
  1211. replacements := params.getStringReplacements(addObjectData, true)
  1212. jsonReplacer := strings.NewReplacer(replacements...)
  1213. return bytes.NewBufferString(replaceWithReplacer(c.Body, jsonReplacer)), "", nil
  1214. }
  1215. return bytes.NewBufferString(replaceWithReplacer(c.Body, replacer)), "", nil
  1216. }
  1217. if len(c.Parts) > 0 {
  1218. r, w := io.Pipe()
  1219. m := multipart.NewWriter(w)
  1220. var conn *BaseConnection
  1221. if user.Username != "" {
  1222. var err error
  1223. user, err = getUserForEventAction(user)
  1224. if err != nil {
  1225. return body, "", err
  1226. }
  1227. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1228. err = user.CheckFsRoot(connectionID)
  1229. if err != nil {
  1230. user.CloseFs() //nolint:errcheck
  1231. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  1232. user.Username, err)
  1233. }
  1234. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1235. }
  1236. go func() {
  1237. defer w.Close()
  1238. defer user.CloseFs() //nolint:errcheck
  1239. for _, part := range c.Parts {
  1240. h := make(textproto.MIMEHeader)
  1241. if part.Body != "" {
  1242. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  1243. } else {
  1244. h.Set("Content-Disposition",
  1245. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  1246. multipartQuoteEscaper.Replace(part.Name),
  1247. multipartQuoteEscaper.Replace((path.Base(replaceWithReplacer(part.Filepath, replacer))))))
  1248. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  1249. if contentType == "" {
  1250. contentType = "application/octet-stream"
  1251. }
  1252. h.Set("Content-Type", contentType)
  1253. }
  1254. for _, keyVal := range part.Headers {
  1255. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1256. }
  1257. if err := writeHTTPPart(m, part, h, conn, replacer, params, addObjectData); err != nil {
  1258. cancel()
  1259. return
  1260. }
  1261. }
  1262. m.Close()
  1263. }()
  1264. return r, m.FormDataContentType(), nil
  1265. }
  1266. return body, "", nil
  1267. }
  1268. func setHTTPReqHeaders(req *http.Request, c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1269. contentType string,
  1270. ) {
  1271. if contentType != "" {
  1272. req.Header.Set("Content-Type", contentType)
  1273. }
  1274. if c.Username != "" || c.Password.GetPayload() != "" {
  1275. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1276. }
  1277. for _, keyVal := range c.Headers {
  1278. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1279. }
  1280. }
  1281. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1282. if err := c.TryDecryptPassword(); err != nil {
  1283. return err
  1284. }
  1285. addObjectData := false
  1286. if params.Object != nil {
  1287. addObjectData = c.HasObjectData()
  1288. }
  1289. replacements := params.getStringReplacements(addObjectData, false)
  1290. replacer := strings.NewReplacer(replacements...)
  1291. endpoint, err := getHTTPRuleActionEndpoint(&c, replacer)
  1292. if err != nil {
  1293. return err
  1294. }
  1295. ctx, cancel := c.GetContext()
  1296. defer cancel()
  1297. var user dataprovider.User
  1298. if c.HasMultipartFiles() {
  1299. user, err = params.getUserFromSender()
  1300. if err != nil {
  1301. return err
  1302. }
  1303. }
  1304. body, contentType, err := getHTTPRuleActionBody(&c, replacer, cancel, user, params, addObjectData)
  1305. if err != nil {
  1306. return err
  1307. }
  1308. if body != nil {
  1309. rc, ok := body.(io.ReadCloser)
  1310. if ok {
  1311. defer rc.Close()
  1312. }
  1313. }
  1314. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1315. if err != nil {
  1316. return err
  1317. }
  1318. setHTTPReqHeaders(req, &c, replacer, contentType)
  1319. client := c.GetHTTPClient()
  1320. defer client.CloseIdleConnections()
  1321. startTime := time.Now()
  1322. resp, err := client.Do(req)
  1323. if err != nil {
  1324. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1325. endpoint, time.Since(startTime), err)
  1326. return fmt.Errorf("error sending HTTP request: %w", err)
  1327. }
  1328. defer resp.Body.Close()
  1329. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1330. endpoint, time.Since(startTime), resp.StatusCode)
  1331. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1332. if rb, err := io.ReadAll(io.LimitReader(resp.Body, 2048)); err == nil {
  1333. eventManagerLog(logger.LevelDebug, "error notification response from endpoint %q: %s", endpoint, string(rb))
  1334. }
  1335. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1336. }
  1337. return nil
  1338. }
  1339. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1340. addObjectData := false
  1341. if params.Object != nil {
  1342. for _, k := range c.EnvVars {
  1343. if strings.Contains(k.Value, objDataPlaceholder) || strings.Contains(k.Value, objDataPlaceholderString) {
  1344. addObjectData = true
  1345. break
  1346. }
  1347. }
  1348. }
  1349. replacements := params.getStringReplacements(addObjectData, false)
  1350. replacer := strings.NewReplacer(replacements...)
  1351. args := make([]string, 0, len(c.Args))
  1352. for _, arg := range c.Args {
  1353. args = append(args, replaceWithReplacer(arg, replacer))
  1354. }
  1355. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1356. defer cancel()
  1357. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1358. cmd.Env = []string{}
  1359. for _, keyVal := range c.EnvVars {
  1360. if keyVal.Value == "$" {
  1361. val := os.Getenv(keyVal.Key)
  1362. if val == "" {
  1363. eventManagerLog(logger.LevelDebug, "empty value for environment variable %q", keyVal.Key)
  1364. }
  1365. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, val))
  1366. } else {
  1367. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1368. }
  1369. }
  1370. startTime := time.Now()
  1371. err := cmd.Run()
  1372. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1373. c.Cmd, time.Since(startTime), err)
  1374. return err
  1375. }
  1376. func getEmailAddressesWithReplacer(addrs []string, replacer *strings.Replacer) []string {
  1377. if len(addrs) == 0 {
  1378. return nil
  1379. }
  1380. recipients := make([]string, 0, len(addrs))
  1381. for _, recipient := range addrs {
  1382. rcpt := replaceWithReplacer(recipient, replacer)
  1383. if rcpt != "" {
  1384. recipients = append(recipients, rcpt)
  1385. }
  1386. }
  1387. return recipients
  1388. }
  1389. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1390. addObjectData := false
  1391. if params.Object != nil {
  1392. if strings.Contains(c.Body, objDataPlaceholder) || strings.Contains(c.Body, objDataPlaceholderString) {
  1393. addObjectData = true
  1394. }
  1395. }
  1396. replacements := params.getStringReplacements(addObjectData, false)
  1397. replacer := strings.NewReplacer(replacements...)
  1398. body := replaceWithReplacer(c.Body, replacer)
  1399. subject := replaceWithReplacer(c.Subject, replacer)
  1400. recipients := getEmailAddressesWithReplacer(c.Recipients, replacer)
  1401. bcc := getEmailAddressesWithReplacer(c.Bcc, replacer)
  1402. startTime := time.Now()
  1403. var files []*mail.File
  1404. fileAttachments := make([]string, 0, len(c.Attachments))
  1405. for _, attachment := range c.Attachments {
  1406. if attachment == dataprovider.RetentionReportPlaceHolder {
  1407. f, err := params.getRetentionReportsAsMailAttachment()
  1408. if err != nil {
  1409. return err
  1410. }
  1411. files = append(files, f)
  1412. continue
  1413. }
  1414. fileAttachments = append(fileAttachments, attachment)
  1415. }
  1416. if len(fileAttachments) > 0 {
  1417. user, err := params.getUserFromSender()
  1418. if err != nil {
  1419. return err
  1420. }
  1421. user, err = getUserForEventAction(user)
  1422. if err != nil {
  1423. return err
  1424. }
  1425. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1426. err = user.CheckFsRoot(connectionID)
  1427. defer user.CloseFs() //nolint:errcheck
  1428. if err != nil {
  1429. return fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  1430. }
  1431. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1432. res, err := getMailAttachments(conn, fileAttachments, replacer)
  1433. if err != nil {
  1434. return err
  1435. }
  1436. files = append(files, res...)
  1437. }
  1438. err := smtp.SendEmail(recipients, bcc, subject, body, smtp.EmailContentType(c.ContentType), files...)
  1439. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1440. time.Since(startTime), err)
  1441. if err != nil {
  1442. return fmt.Errorf("unable to send email: %w", err)
  1443. }
  1444. return nil
  1445. }
  1446. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1447. err := user.LoadAndApplyGroupSettings()
  1448. if err != nil {
  1449. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1450. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1451. }
  1452. user.UploadDataTransfer = 0
  1453. user.UploadBandwidth = 0
  1454. user.DownloadBandwidth = 0
  1455. user.Filters.DisableFsChecks = false
  1456. user.Filters.FilePatterns = nil
  1457. user.Filters.BandwidthLimits = nil
  1458. for k := range user.Permissions {
  1459. user.Permissions[k] = []string{dataprovider.PermAny}
  1460. }
  1461. return user, nil
  1462. }
  1463. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1464. results := make([]string, 0, len(paths))
  1465. for _, p := range paths {
  1466. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1467. }
  1468. return util.RemoveDuplicates(results, false)
  1469. }
  1470. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1471. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1472. if err != nil {
  1473. return err
  1474. }
  1475. return conn.RemoveFile(fs, fsPath, item, info)
  1476. }
  1477. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1478. user, err := getUserForEventAction(user)
  1479. if err != nil {
  1480. return err
  1481. }
  1482. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1483. err = user.CheckFsRoot(connectionID)
  1484. defer user.CloseFs() //nolint:errcheck
  1485. if err != nil {
  1486. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1487. }
  1488. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1489. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1490. info, err := conn.DoStat(item, 0, false)
  1491. if err != nil {
  1492. if conn.IsNotExistError(err) {
  1493. continue
  1494. }
  1495. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1496. }
  1497. if info.IsDir() {
  1498. if err = conn.RemoveDir(item); err != nil {
  1499. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1500. }
  1501. } else {
  1502. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1503. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1504. }
  1505. }
  1506. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1507. }
  1508. return nil
  1509. }
  1510. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1511. conditions dataprovider.ConditionOptions, params *EventParams,
  1512. ) error {
  1513. users, err := params.getUsers()
  1514. if err != nil {
  1515. return fmt.Errorf("unable to get users: %w", err)
  1516. }
  1517. var failures []string
  1518. executed := 0
  1519. for _, user := range users {
  1520. // if sender is set, the conditions have already been evaluated
  1521. if params.sender == "" {
  1522. if !checkUserConditionOptions(&user, &conditions) {
  1523. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1524. user.Username)
  1525. continue
  1526. }
  1527. }
  1528. executed++
  1529. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1530. params.AddError(err)
  1531. failures = append(failures, user.Username)
  1532. }
  1533. }
  1534. if len(failures) > 0 {
  1535. return fmt.Errorf("fs delete failed for users: %s", strings.Join(failures, ", "))
  1536. }
  1537. if executed == 0 {
  1538. eventManagerLog(logger.LevelError, "no delete executed")
  1539. return errors.New("no delete executed")
  1540. }
  1541. return nil
  1542. }
  1543. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1544. user, err := getUserForEventAction(user)
  1545. if err != nil {
  1546. return err
  1547. }
  1548. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1549. err = user.CheckFsRoot(connectionID)
  1550. defer user.CloseFs() //nolint:errcheck
  1551. if err != nil {
  1552. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1553. }
  1554. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1555. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1556. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1557. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1558. }
  1559. if err = conn.createDirIfMissing(item); err != nil {
  1560. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1561. }
  1562. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1563. }
  1564. return nil
  1565. }
  1566. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1567. conditions dataprovider.ConditionOptions, params *EventParams,
  1568. ) error {
  1569. users, err := params.getUsers()
  1570. if err != nil {
  1571. return fmt.Errorf("unable to get users: %w", err)
  1572. }
  1573. var failures []string
  1574. executed := 0
  1575. for _, user := range users {
  1576. // if sender is set, the conditions have already been evaluated
  1577. if params.sender == "" {
  1578. if !checkUserConditionOptions(&user, &conditions) {
  1579. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1580. user.Username)
  1581. continue
  1582. }
  1583. }
  1584. executed++
  1585. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1586. failures = append(failures, user.Username)
  1587. }
  1588. }
  1589. if len(failures) > 0 {
  1590. return fmt.Errorf("fs mkdir failed for users: %s", strings.Join(failures, ", "))
  1591. }
  1592. if executed == 0 {
  1593. eventManagerLog(logger.LevelError, "no mkdir executed")
  1594. return errors.New("no mkdir executed")
  1595. }
  1596. return nil
  1597. }
  1598. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1599. user dataprovider.User,
  1600. ) error {
  1601. user, err := getUserForEventAction(user)
  1602. if err != nil {
  1603. return err
  1604. }
  1605. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1606. err = user.CheckFsRoot(connectionID)
  1607. defer user.CloseFs() //nolint:errcheck
  1608. if err != nil {
  1609. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1610. }
  1611. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1612. for _, item := range renames {
  1613. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1614. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1615. if err = conn.renameInternal(source, target, true); err != nil {
  1616. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1617. }
  1618. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1619. }
  1620. return nil
  1621. }
  1622. func executeCopyFsActionForUser(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1623. user dataprovider.User,
  1624. ) error {
  1625. user, err := getUserForEventAction(user)
  1626. if err != nil {
  1627. return err
  1628. }
  1629. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1630. err = user.CheckFsRoot(connectionID)
  1631. defer user.CloseFs() //nolint:errcheck
  1632. if err != nil {
  1633. return fmt.Errorf("copy error, unable to check root fs for user %q: %w", user.Username, err)
  1634. }
  1635. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1636. for _, item := range copy {
  1637. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1638. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1639. if strings.HasSuffix(item.Key, "/") {
  1640. source += "/"
  1641. }
  1642. if strings.HasSuffix(item.Value, "/") {
  1643. target += "/"
  1644. }
  1645. if err = conn.Copy(source, target); err != nil {
  1646. return fmt.Errorf("unable to copy %q->%q, user %q: %w", source, target, user.Username, err)
  1647. }
  1648. eventManagerLog(logger.LevelDebug, "copy %q->%q ok, user %q", source, target, user.Username)
  1649. }
  1650. return nil
  1651. }
  1652. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1653. user dataprovider.User,
  1654. ) error {
  1655. user, err := getUserForEventAction(user)
  1656. if err != nil {
  1657. return err
  1658. }
  1659. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1660. err = user.CheckFsRoot(connectionID)
  1661. defer user.CloseFs() //nolint:errcheck
  1662. if err != nil {
  1663. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1664. }
  1665. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1666. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1667. if _, err = conn.DoStat(item, 0, false); err != nil {
  1668. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1669. }
  1670. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1671. }
  1672. return nil
  1673. }
  1674. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1675. conditions dataprovider.ConditionOptions, params *EventParams,
  1676. ) error {
  1677. users, err := params.getUsers()
  1678. if err != nil {
  1679. return fmt.Errorf("unable to get users: %w", err)
  1680. }
  1681. var failures []string
  1682. executed := 0
  1683. for _, user := range users {
  1684. // if sender is set, the conditions have already been evaluated
  1685. if params.sender == "" {
  1686. if !checkUserConditionOptions(&user, &conditions) {
  1687. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1688. user.Username)
  1689. continue
  1690. }
  1691. }
  1692. executed++
  1693. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1694. failures = append(failures, user.Username)
  1695. params.AddError(err)
  1696. }
  1697. }
  1698. if len(failures) > 0 {
  1699. return fmt.Errorf("fs rename failed for users: %s", strings.Join(failures, ", "))
  1700. }
  1701. if executed == 0 {
  1702. eventManagerLog(logger.LevelError, "no rename executed")
  1703. return errors.New("no rename executed")
  1704. }
  1705. return nil
  1706. }
  1707. func executeCopyFsRuleAction(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1708. conditions dataprovider.ConditionOptions, params *EventParams,
  1709. ) error {
  1710. users, err := params.getUsers()
  1711. if err != nil {
  1712. return fmt.Errorf("unable to get users: %w", err)
  1713. }
  1714. var failures []string
  1715. var executed int
  1716. for _, user := range users {
  1717. // if sender is set, the conditions have already been evaluated
  1718. if params.sender == "" {
  1719. if !checkUserConditionOptions(&user, &conditions) {
  1720. eventManagerLog(logger.LevelDebug, "skipping fs copy for user %s, condition options don't match",
  1721. user.Username)
  1722. continue
  1723. }
  1724. }
  1725. executed++
  1726. if err = executeCopyFsActionForUser(copy, replacer, user); err != nil {
  1727. failures = append(failures, user.Username)
  1728. params.AddError(err)
  1729. }
  1730. }
  1731. if len(failures) > 0 {
  1732. return fmt.Errorf("fs copy failed for users: %s", strings.Join(failures, ", "))
  1733. }
  1734. if executed == 0 {
  1735. eventManagerLog(logger.LevelError, "no copy executed")
  1736. return errors.New("no copy executed")
  1737. }
  1738. return nil
  1739. }
  1740. func getArchiveBaseDir(paths []string) string {
  1741. var parentDirs []string
  1742. for _, p := range paths {
  1743. parentDirs = append(parentDirs, path.Dir(p))
  1744. }
  1745. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1746. baseDir := "/"
  1747. if len(parentDirs) == 1 {
  1748. baseDir = parentDirs[0]
  1749. }
  1750. return baseDir
  1751. }
  1752. func getSizeForPath(conn *BaseConnection, p string, info os.FileInfo) (int64, error) {
  1753. if info.IsDir() {
  1754. var dirSize int64
  1755. entries, err := conn.ListDir(p)
  1756. if err != nil {
  1757. return 0, err
  1758. }
  1759. for _, entry := range entries {
  1760. size, err := getSizeForPath(conn, path.Join(p, entry.Name()), entry)
  1761. if err != nil {
  1762. return 0, err
  1763. }
  1764. dirSize += size
  1765. }
  1766. return dirSize, nil
  1767. }
  1768. if info.Mode().IsRegular() {
  1769. return info.Size(), nil
  1770. }
  1771. return 0, nil
  1772. }
  1773. func estimateZipSize(conn *BaseConnection, zipPath string, paths []string) (int64, error) {
  1774. q, _ := conn.HasSpace(false, false, zipPath)
  1775. if q.HasSpace && q.GetRemainingSize() > 0 {
  1776. var size int64
  1777. for _, item := range paths {
  1778. info, err := conn.DoStat(item, 1, false)
  1779. if err != nil {
  1780. return size, err
  1781. }
  1782. itemSize, err := getSizeForPath(conn, item, info)
  1783. if err != nil {
  1784. return size, err
  1785. }
  1786. size += itemSize
  1787. }
  1788. eventManagerLog(logger.LevelDebug, "archive paths %v, archive name %q, size: %d", paths, zipPath, size)
  1789. // we assume the zip size will be half of the real size
  1790. return size / 2, nil
  1791. }
  1792. return -1, nil
  1793. }
  1794. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1795. user dataprovider.User,
  1796. ) error {
  1797. user, err := getUserForEventAction(user)
  1798. if err != nil {
  1799. return err
  1800. }
  1801. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1802. err = user.CheckFsRoot(connectionID)
  1803. defer user.CloseFs() //nolint:errcheck
  1804. if err != nil {
  1805. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1806. }
  1807. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1808. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1809. conn.CheckParentDirs(path.Dir(name)) //nolint:errcheck
  1810. paths := make([]string, 0, len(c.Paths))
  1811. for idx := range c.Paths {
  1812. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1813. if p == name {
  1814. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1815. }
  1816. paths = append(paths, p)
  1817. }
  1818. paths = util.RemoveDuplicates(paths, false)
  1819. estimatedSize, err := estimateZipSize(conn, name, paths)
  1820. if err != nil {
  1821. eventManagerLog(logger.LevelError, "unable to estimate size for archive %q: %v", name, err)
  1822. return fmt.Errorf("unable to estimate archive size: %w", err)
  1823. }
  1824. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name, estimatedSize)
  1825. if err != nil {
  1826. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1827. return fmt.Errorf("unable to create archive: %w", err)
  1828. }
  1829. defer cancelFn()
  1830. baseDir := getArchiveBaseDir(paths)
  1831. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1832. zipWriter := &zipWriterWrapper{
  1833. Name: name,
  1834. Writer: zip.NewWriter(writer),
  1835. Entries: make(map[string]bool),
  1836. }
  1837. startTime := time.Now()
  1838. for _, item := range paths {
  1839. if err := addZipEntry(zipWriter, conn, item, baseDir); err != nil {
  1840. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1841. return err
  1842. }
  1843. }
  1844. if err := zipWriter.Writer.Close(); err != nil {
  1845. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1846. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1847. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1848. }
  1849. return closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime)
  1850. }
  1851. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1852. params *EventParams,
  1853. ) error {
  1854. users, err := params.getUsers()
  1855. if err != nil {
  1856. return fmt.Errorf("unable to get users: %w", err)
  1857. }
  1858. var failures []string
  1859. executed := 0
  1860. for _, user := range users {
  1861. // if sender is set, the conditions have already been evaluated
  1862. if params.sender == "" {
  1863. if !checkUserConditionOptions(&user, &conditions) {
  1864. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1865. user.Username)
  1866. continue
  1867. }
  1868. }
  1869. executed++
  1870. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1871. failures = append(failures, user.Username)
  1872. params.AddError(err)
  1873. }
  1874. }
  1875. if len(failures) > 0 {
  1876. return fmt.Errorf("fs existence check failed for users: %s", strings.Join(failures, ", "))
  1877. }
  1878. if executed == 0 {
  1879. eventManagerLog(logger.LevelError, "no existence check executed")
  1880. return errors.New("no existence check executed")
  1881. }
  1882. return nil
  1883. }
  1884. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1885. conditions dataprovider.ConditionOptions, params *EventParams,
  1886. ) error {
  1887. users, err := params.getUsers()
  1888. if err != nil {
  1889. return fmt.Errorf("unable to get users: %w", err)
  1890. }
  1891. var failures []string
  1892. executed := 0
  1893. for _, user := range users {
  1894. // if sender is set, the conditions have already been evaluated
  1895. if params.sender == "" {
  1896. if !checkUserConditionOptions(&user, &conditions) {
  1897. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1898. user.Username)
  1899. continue
  1900. }
  1901. }
  1902. executed++
  1903. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1904. failures = append(failures, user.Username)
  1905. params.AddError(err)
  1906. }
  1907. }
  1908. if len(failures) > 0 {
  1909. return fmt.Errorf("fs compress failed for users: %s", strings.Join(failures, ","))
  1910. }
  1911. if executed == 0 {
  1912. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1913. return errors.New("no file/folder compressed")
  1914. }
  1915. return nil
  1916. }
  1917. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1918. params *EventParams,
  1919. ) error {
  1920. addObjectData := false
  1921. replacements := params.getStringReplacements(addObjectData, false)
  1922. replacer := strings.NewReplacer(replacements...)
  1923. switch c.Type {
  1924. case dataprovider.FilesystemActionRename:
  1925. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1926. case dataprovider.FilesystemActionDelete:
  1927. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1928. case dataprovider.FilesystemActionMkdirs:
  1929. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1930. case dataprovider.FilesystemActionExist:
  1931. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1932. case dataprovider.FilesystemActionCompress:
  1933. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1934. case dataprovider.FilesystemActionCopy:
  1935. return executeCopyFsRuleAction(c.Copy, replacer, conditions, params)
  1936. default:
  1937. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1938. }
  1939. }
  1940. func executeQuotaResetForUser(user *dataprovider.User) error {
  1941. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1942. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1943. user.Username, err)
  1944. return err
  1945. }
  1946. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  1947. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1948. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1949. }
  1950. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1951. numFiles, size, err := user.ScanQuota()
  1952. if err != nil {
  1953. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1954. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1955. }
  1956. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  1957. if err != nil {
  1958. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1959. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1960. }
  1961. return nil
  1962. }
  1963. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1964. users, err := params.getUsers()
  1965. if err != nil {
  1966. return fmt.Errorf("unable to get users: %w", err)
  1967. }
  1968. var failures []string
  1969. executed := 0
  1970. for _, user := range users {
  1971. // if sender is set, the conditions have already been evaluated
  1972. if params.sender == "" {
  1973. if !checkUserConditionOptions(&user, &conditions) {
  1974. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  1975. user.Username)
  1976. continue
  1977. }
  1978. }
  1979. executed++
  1980. if err = executeQuotaResetForUser(&user); err != nil {
  1981. params.AddError(err)
  1982. failures = append(failures, user.Username)
  1983. }
  1984. }
  1985. if len(failures) > 0 {
  1986. return fmt.Errorf("quota reset failed for users: %s", strings.Join(failures, ", "))
  1987. }
  1988. if executed == 0 {
  1989. eventManagerLog(logger.LevelError, "no user quota reset executed")
  1990. return errors.New("no user quota reset executed")
  1991. }
  1992. return nil
  1993. }
  1994. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1995. folders, err := params.getFolders()
  1996. if err != nil {
  1997. return fmt.Errorf("unable to get folders: %w", err)
  1998. }
  1999. var failures []string
  2000. executed := 0
  2001. for _, folder := range folders {
  2002. // if sender is set, the conditions have already been evaluated
  2003. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  2004. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  2005. folder.Name)
  2006. continue
  2007. }
  2008. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  2009. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  2010. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  2011. failures = append(failures, folder.Name)
  2012. continue
  2013. }
  2014. executed++
  2015. f := vfs.VirtualFolder{
  2016. BaseVirtualFolder: folder,
  2017. VirtualPath: "/",
  2018. }
  2019. numFiles, size, err := f.ScanQuota()
  2020. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  2021. if err != nil {
  2022. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  2023. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  2024. failures = append(failures, folder.Name)
  2025. continue
  2026. }
  2027. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  2028. if err != nil {
  2029. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  2030. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  2031. failures = append(failures, folder.Name)
  2032. }
  2033. }
  2034. if len(failures) > 0 {
  2035. return fmt.Errorf("quota reset failed for folders: %s", strings.Join(failures, ", "))
  2036. }
  2037. if executed == 0 {
  2038. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  2039. return errors.New("no folder quota reset executed")
  2040. }
  2041. return nil
  2042. }
  2043. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2044. users, err := params.getUsers()
  2045. if err != nil {
  2046. return fmt.Errorf("unable to get users: %w", err)
  2047. }
  2048. var failures []string
  2049. executed := 0
  2050. for _, user := range users {
  2051. // if sender is set, the conditions have already been evaluated
  2052. if params.sender == "" {
  2053. if !checkUserConditionOptions(&user, &conditions) {
  2054. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  2055. user.Username)
  2056. continue
  2057. }
  2058. }
  2059. executed++
  2060. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  2061. if err != nil {
  2062. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  2063. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  2064. failures = append(failures, user.Username)
  2065. }
  2066. }
  2067. if len(failures) > 0 {
  2068. return fmt.Errorf("transfer quota reset failed for users: %s", strings.Join(failures, ", "))
  2069. }
  2070. if executed == 0 {
  2071. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  2072. return errors.New("no transfer quota reset executed")
  2073. }
  2074. return nil
  2075. }
  2076. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  2077. params *EventParams, actionName string,
  2078. ) error {
  2079. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2080. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  2081. user.Username, err)
  2082. return err
  2083. }
  2084. check := RetentionCheck{
  2085. Folders: folders,
  2086. }
  2087. c := RetentionChecks.Add(check, &user)
  2088. if c == nil {
  2089. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  2090. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  2091. }
  2092. defer func() {
  2093. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  2094. Username: user.Username,
  2095. ActionName: actionName,
  2096. Results: c.results,
  2097. })
  2098. }()
  2099. if err := c.Start(); err != nil {
  2100. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  2101. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  2102. }
  2103. return nil
  2104. }
  2105. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  2106. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  2107. ) error {
  2108. users, err := params.getUsers()
  2109. if err != nil {
  2110. return fmt.Errorf("unable to get users: %w", err)
  2111. }
  2112. var failures []string
  2113. executed := 0
  2114. for _, user := range users {
  2115. // if sender is set, the conditions have already been evaluated
  2116. if params.sender == "" {
  2117. if !checkUserConditionOptions(&user, &conditions) {
  2118. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  2119. user.Username)
  2120. continue
  2121. }
  2122. }
  2123. executed++
  2124. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  2125. failures = append(failures, user.Username)
  2126. params.AddError(err)
  2127. }
  2128. }
  2129. if len(failures) > 0 {
  2130. return fmt.Errorf("retention check failed for users: %s", strings.Join(failures, ", "))
  2131. }
  2132. if executed == 0 {
  2133. eventManagerLog(logger.LevelError, "no retention check executed")
  2134. return errors.New("no retention check executed")
  2135. }
  2136. return nil
  2137. }
  2138. func executeUserExpirationCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2139. users, err := params.getUsers()
  2140. if err != nil {
  2141. return fmt.Errorf("unable to get users: %w", err)
  2142. }
  2143. var failures []string
  2144. var executed int
  2145. for _, user := range users {
  2146. // if sender is set, the conditions have already been evaluated
  2147. if params.sender == "" {
  2148. if !checkUserConditionOptions(&user, &conditions) {
  2149. eventManagerLog(logger.LevelDebug, "skipping expiration check for user %q, condition options don't match",
  2150. user.Username)
  2151. continue
  2152. }
  2153. }
  2154. executed++
  2155. if user.ExpirationDate > 0 {
  2156. expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate)
  2157. if expDate.Before(time.Now()) {
  2158. failures = append(failures, user.Username)
  2159. }
  2160. }
  2161. }
  2162. if len(failures) > 0 {
  2163. return fmt.Errorf("expired users: %s", strings.Join(failures, ", "))
  2164. }
  2165. if executed == 0 {
  2166. eventManagerLog(logger.LevelError, "no user expiration check executed")
  2167. return errors.New("no user expiration check executed")
  2168. }
  2169. return nil
  2170. }
  2171. func executeMetadataCheckForUser(user *dataprovider.User) error {
  2172. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2173. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  2174. user.Username, err)
  2175. return err
  2176. }
  2177. if !ActiveMetadataChecks.Add(user.Username, user.Role) {
  2178. eventManagerLog(logger.LevelError, "another metadata check is already in progress for user %q", user.Username)
  2179. return fmt.Errorf("another metadata check is in progress for user %q", user.Username)
  2180. }
  2181. defer ActiveMetadataChecks.Remove(user.Username)
  2182. if err := user.CheckMetadataConsistency(); err != nil {
  2183. eventManagerLog(logger.LevelError, "error checking metadata consistence for user %q: %v", user.Username, err)
  2184. return fmt.Errorf("error checking metadata consistence for user %q: %w", user.Username, err)
  2185. }
  2186. return nil
  2187. }
  2188. func executeMetadataCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2189. users, err := params.getUsers()
  2190. if err != nil {
  2191. return fmt.Errorf("unable to get users: %w", err)
  2192. }
  2193. var failures []string
  2194. var executed int
  2195. for _, user := range users {
  2196. // if sender is set, the conditions have already been evaluated
  2197. if params.sender == "" {
  2198. if !checkUserConditionOptions(&user, &conditions) {
  2199. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, condition options don't match",
  2200. user.Username)
  2201. continue
  2202. }
  2203. }
  2204. executed++
  2205. if err = executeMetadataCheckForUser(&user); err != nil {
  2206. params.AddError(err)
  2207. failures = append(failures, user.Username)
  2208. }
  2209. }
  2210. if len(failures) > 0 {
  2211. return fmt.Errorf("metadata check failed for users: %s", strings.Join(failures, ", "))
  2212. }
  2213. if executed == 0 {
  2214. eventManagerLog(logger.LevelError, "no metadata check executed")
  2215. return errors.New("no metadata check executed")
  2216. }
  2217. return nil
  2218. }
  2219. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  2220. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2221. eventManagerLog(logger.LevelError, "skipping password expiration check for user %q, cannot apply group settings: %v",
  2222. user.Username, err)
  2223. return err
  2224. }
  2225. if user.ExpirationDate > 0 {
  2226. if expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate); expDate.Before(time.Now()) {
  2227. eventManagerLog(logger.LevelDebug, "skipping password expiration check for expired user %q, expiration date: %s",
  2228. user.Username, expDate)
  2229. return nil
  2230. }
  2231. }
  2232. if user.Filters.PasswordExpiration == 0 {
  2233. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  2234. return nil
  2235. }
  2236. days := user.PasswordExpiresIn()
  2237. if days > config.Threshold {
  2238. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  2239. user.Username, days, config.Threshold)
  2240. return nil
  2241. }
  2242. body := new(bytes.Buffer)
  2243. data := make(map[string]any)
  2244. data["Username"] = user.Username
  2245. data["Days"] = days
  2246. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  2247. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  2248. user.Username, err)
  2249. return err
  2250. }
  2251. subject := "SFTPGo password expiration notification"
  2252. startTime := time.Now()
  2253. if err := smtp.SendEmail([]string{user.Email}, nil, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  2254. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  2255. user.Username, err, time.Since(startTime))
  2256. return err
  2257. }
  2258. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  2259. user.Username, days, time.Since(startTime))
  2260. return nil
  2261. }
  2262. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  2263. params *EventParams) error {
  2264. users, err := params.getUsers()
  2265. if err != nil {
  2266. return fmt.Errorf("unable to get users: %w", err)
  2267. }
  2268. var failures []string
  2269. for _, user := range users {
  2270. // if sender is set, the conditions have already been evaluated
  2271. if params.sender == "" {
  2272. if !checkUserConditionOptions(&user, &conditions) {
  2273. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  2274. user.Username)
  2275. continue
  2276. }
  2277. }
  2278. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  2279. params.AddError(err)
  2280. failures = append(failures, user.Username)
  2281. }
  2282. }
  2283. if len(failures) > 0 {
  2284. return fmt.Errorf("password expiration check failed for users: %s", strings.Join(failures, ", "))
  2285. }
  2286. return nil
  2287. }
  2288. func executeAdminCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.Admin, error) {
  2289. admin, err := dataprovider.AdminExists(params.Name)
  2290. exists := err == nil
  2291. if exists && c.Mode == 1 {
  2292. return &admin, nil
  2293. }
  2294. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2295. return nil, err
  2296. }
  2297. replacements := params.getStringReplacements(false, true)
  2298. replacer := strings.NewReplacer(replacements...)
  2299. data := replaceWithReplacer(c.TemplateAdmin, replacer)
  2300. var newAdmin dataprovider.Admin
  2301. err = json.Unmarshal([]byte(data), &newAdmin)
  2302. if err != nil {
  2303. return nil, err
  2304. }
  2305. if newAdmin.Password == "" {
  2306. newAdmin.Password = util.GenerateUniqueID()
  2307. }
  2308. if exists {
  2309. eventManagerLog(logger.LevelDebug, "updating admin %q after IDP login", params.Name)
  2310. err = dataprovider.UpdateAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2311. } else {
  2312. eventManagerLog(logger.LevelDebug, "creating admin %q after IDP login", params.Name)
  2313. err = dataprovider.AddAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2314. }
  2315. return &newAdmin, err
  2316. }
  2317. func executeUserCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.User, error) {
  2318. user, err := dataprovider.UserExists(params.Name, "")
  2319. exists := err == nil
  2320. if exists && c.Mode == 1 {
  2321. err = user.LoadAndApplyGroupSettings()
  2322. return &user, err
  2323. }
  2324. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2325. return nil, err
  2326. }
  2327. replacements := params.getStringReplacements(false, true)
  2328. replacer := strings.NewReplacer(replacements...)
  2329. data := replaceWithReplacer(c.TemplateUser, replacer)
  2330. var newUser dataprovider.User
  2331. err = json.Unmarshal([]byte(data), &newUser)
  2332. if err != nil {
  2333. return nil, err
  2334. }
  2335. if exists {
  2336. eventManagerLog(logger.LevelDebug, "updating user %q after IDP login", params.Name)
  2337. err = dataprovider.UpdateUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2338. } else {
  2339. eventManagerLog(logger.LevelDebug, "creating user %q after IDP login", params.Name)
  2340. err = dataprovider.AddUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2341. }
  2342. if err != nil {
  2343. return nil, err
  2344. }
  2345. u, err := dataprovider.GetUserWithGroupSettings(params.Name, "")
  2346. return &u, err
  2347. }
  2348. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams,
  2349. conditions dataprovider.ConditionOptions,
  2350. ) error {
  2351. var err error
  2352. switch action.Type {
  2353. case dataprovider.ActionTypeHTTP:
  2354. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  2355. case dataprovider.ActionTypeCommand:
  2356. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  2357. case dataprovider.ActionTypeEmail:
  2358. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  2359. case dataprovider.ActionTypeBackup:
  2360. var backupPath string
  2361. backupPath, err = dataprovider.ExecuteBackup()
  2362. if err == nil {
  2363. params.setBackupParams(backupPath)
  2364. }
  2365. case dataprovider.ActionTypeUserQuotaReset:
  2366. err = executeUsersQuotaResetRuleAction(conditions, params)
  2367. case dataprovider.ActionTypeFolderQuotaReset:
  2368. err = executeFoldersQuotaResetRuleAction(conditions, params)
  2369. case dataprovider.ActionTypeTransferQuotaReset:
  2370. err = executeTransferQuotaResetRuleAction(conditions, params)
  2371. case dataprovider.ActionTypeDataRetentionCheck:
  2372. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  2373. case dataprovider.ActionTypeMetadataCheck:
  2374. err = executeMetadataCheckRuleAction(conditions, params)
  2375. case dataprovider.ActionTypeFilesystem:
  2376. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  2377. case dataprovider.ActionTypePasswordExpirationCheck:
  2378. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  2379. case dataprovider.ActionTypeUserExpirationCheck:
  2380. err = executeUserExpirationCheckRuleAction(conditions, params)
  2381. default:
  2382. err = fmt.Errorf("unsupported action type: %d", action.Type)
  2383. }
  2384. if err != nil {
  2385. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  2386. }
  2387. params.AddError(err)
  2388. return err
  2389. }
  2390. func executeIDPAccountCheckRule(rule dataprovider.EventRule, params EventParams) (*dataprovider.User,
  2391. *dataprovider.Admin, error,
  2392. ) {
  2393. for _, action := range rule.Actions {
  2394. if action.Type == dataprovider.ActionTypeIDPAccountCheck {
  2395. startTime := time.Now()
  2396. var user *dataprovider.User
  2397. var admin *dataprovider.Admin
  2398. var err error
  2399. var failedActions []string
  2400. paramsCopy := params.getACopy()
  2401. switch params.Event {
  2402. case IDPLoginAdmin:
  2403. admin, err = executeAdminCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2404. case IDPLoginUser:
  2405. user, err = executeUserCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2406. default:
  2407. err = fmt.Errorf("unsupported IDP login event: %q", params.Event)
  2408. }
  2409. if err != nil {
  2410. paramsCopy.AddError(fmt.Errorf("unable to handle %q: %w", params.Event, err))
  2411. eventManagerLog(logger.LevelError, "unable to handle IDP login event %q, err: %v", params.Event, err)
  2412. failedActions = append(failedActions, action.Name)
  2413. } else {
  2414. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2415. action.Name, rule.Name, time.Since(startTime))
  2416. }
  2417. // execute async actions if any, including failure actions
  2418. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2419. return user, admin, err
  2420. }
  2421. }
  2422. eventManagerLog(logger.LevelError, "no action executed for IDP login event %q, event rule: %q", params.Event, rule.Name)
  2423. return nil, nil, errors.New("no action executed")
  2424. }
  2425. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  2426. var errRes error
  2427. for _, rule := range rules {
  2428. var failedActions []string
  2429. paramsCopy := params.getACopy()
  2430. for _, action := range rule.Actions {
  2431. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  2432. startTime := time.Now()
  2433. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  2434. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  2435. action.Name, rule.Name, time.Since(startTime), err)
  2436. failedActions = append(failedActions, action.Name)
  2437. // we return the last error, it is ok for now
  2438. errRes = err
  2439. if action.Options.StopOnFailure {
  2440. break
  2441. }
  2442. } else {
  2443. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  2444. action.Name, rule.Name, time.Since(startTime))
  2445. }
  2446. }
  2447. }
  2448. // execute async actions if any, including failure actions
  2449. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2450. }
  2451. return errRes
  2452. }
  2453. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  2454. eventManager.addAsyncTask()
  2455. defer eventManager.removeAsyncTask()
  2456. params.addUID()
  2457. for _, rule := range rules {
  2458. executeRuleAsyncActions(rule, params.getACopy(), nil)
  2459. }
  2460. }
  2461. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  2462. for _, action := range rule.Actions {
  2463. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  2464. startTime := time.Now()
  2465. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2466. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  2467. action.Name, rule.Name, time.Since(startTime), err)
  2468. failedActions = append(failedActions, action.Name)
  2469. if action.Options.StopOnFailure {
  2470. break
  2471. }
  2472. } else {
  2473. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2474. action.Name, rule.Name, time.Since(startTime))
  2475. }
  2476. }
  2477. }
  2478. if len(failedActions) > 0 {
  2479. params.updateStatusFromError = false
  2480. // execute failure actions
  2481. for _, action := range rule.Actions {
  2482. if action.Options.IsFailureAction {
  2483. startTime := time.Now()
  2484. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2485. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  2486. action.Name, rule.Name, time.Since(startTime), err)
  2487. if action.Options.StopOnFailure {
  2488. break
  2489. }
  2490. } else {
  2491. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  2492. action.Name, rule.Name, time.Since(startTime))
  2493. }
  2494. }
  2495. }
  2496. }
  2497. }
  2498. type eventCronJob struct {
  2499. ruleName string
  2500. }
  2501. func (j *eventCronJob) getTask(rule *dataprovider.EventRule) (dataprovider.Task, error) {
  2502. if rule.GuardFromConcurrentExecution() {
  2503. task, err := dataprovider.GetTaskByName(rule.Name)
  2504. if err != nil {
  2505. if errors.Is(err, util.ErrNotFound) {
  2506. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  2507. task = dataprovider.Task{
  2508. Name: rule.Name,
  2509. UpdateAt: 0,
  2510. Version: 0,
  2511. }
  2512. err = dataprovider.AddTask(rule.Name)
  2513. if err != nil {
  2514. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  2515. return task, err
  2516. }
  2517. } else {
  2518. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  2519. }
  2520. }
  2521. return task, err
  2522. }
  2523. return dataprovider.Task{}, nil
  2524. }
  2525. func (j *eventCronJob) Run() {
  2526. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  2527. rule, err := dataprovider.EventRuleExists(j.ruleName)
  2528. if err != nil {
  2529. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  2530. return
  2531. }
  2532. if err := rule.CheckActionsConsistency(""); err != nil {
  2533. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  2534. return
  2535. }
  2536. task, err := j.getTask(&rule)
  2537. if err != nil {
  2538. return
  2539. }
  2540. if task.Name != "" {
  2541. updateInterval := 5 * time.Minute
  2542. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  2543. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  2544. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  2545. return
  2546. }
  2547. err = dataprovider.UpdateTask(rule.Name, task.Version)
  2548. if err != nil {
  2549. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  2550. rule.Name, err)
  2551. return
  2552. }
  2553. ticker := time.NewTicker(updateInterval)
  2554. done := make(chan bool)
  2555. defer func() {
  2556. done <- true
  2557. ticker.Stop()
  2558. }()
  2559. go func(taskName string) {
  2560. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2561. for {
  2562. select {
  2563. case <-done:
  2564. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2565. return
  2566. case <-ticker.C:
  2567. err := dataprovider.UpdateTaskTimestamp(taskName)
  2568. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2569. }
  2570. }
  2571. }(task.Name)
  2572. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2573. } else {
  2574. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2575. }
  2576. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2577. }
  2578. // RunOnDemandRule executes actions for a rule with on-demand trigger
  2579. func RunOnDemandRule(name string) error {
  2580. eventManagerLog(logger.LevelDebug, "executing on demand rule %q", name)
  2581. rule, err := dataprovider.EventRuleExists(name)
  2582. if err != nil {
  2583. eventManagerLog(logger.LevelDebug, "unable to load rule with name %q", name)
  2584. return util.NewRecordNotFoundError(fmt.Sprintf("rule %q does not exist", name))
  2585. }
  2586. if rule.Trigger != dataprovider.EventTriggerOnDemand {
  2587. eventManagerLog(logger.LevelDebug, "cannot run rule %q as on demand, trigger: %d", name, rule.Trigger)
  2588. return util.NewValidationError(fmt.Sprintf("rule %q is not defined as on-demand", name))
  2589. }
  2590. if rule.Status != 1 {
  2591. eventManagerLog(logger.LevelDebug, "on-demand rule %q is inactive", name)
  2592. return util.NewValidationError(fmt.Sprintf("rule %q is inactive", name))
  2593. }
  2594. if err := rule.CheckActionsConsistency(""); err != nil {
  2595. eventManagerLog(logger.LevelError, "on-demand rule %q has incompatible actions: %v", name, err)
  2596. return util.NewValidationError(fmt.Sprintf("rule %q has incosistent actions", name))
  2597. }
  2598. eventManagerLog(logger.LevelDebug, "on-demand rule %q started", name)
  2599. go executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2600. return nil
  2601. }
  2602. type zipWriterWrapper struct {
  2603. Name string
  2604. Entries map[string]bool
  2605. Writer *zip.Writer
  2606. }
  2607. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2608. logger.Log(level, "eventmanager", "", format, v...)
  2609. }