eventmanager.go 92 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "mime/multipart"
  25. "net/http"
  26. "net/textproto"
  27. "net/url"
  28. "os"
  29. "os/exec"
  30. "path"
  31. "path/filepath"
  32. "slices"
  33. "strconv"
  34. "strings"
  35. "sync"
  36. "sync/atomic"
  37. "time"
  38. "github.com/bmatcuk/doublestar/v4"
  39. "github.com/klauspost/compress/zip"
  40. "github.com/robfig/cron/v3"
  41. "github.com/rs/xid"
  42. "github.com/sftpgo/sdk"
  43. "github.com/wneessen/go-mail"
  44. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  45. "github.com/drakkan/sftpgo/v2/internal/logger"
  46. "github.com/drakkan/sftpgo/v2/internal/plugin"
  47. "github.com/drakkan/sftpgo/v2/internal/smtp"
  48. "github.com/drakkan/sftpgo/v2/internal/util"
  49. "github.com/drakkan/sftpgo/v2/internal/vfs"
  50. )
  51. const (
  52. ipBlockedEventName = "IP Blocked"
  53. maxAttachmentsSize = int64(10 * 1024 * 1024)
  54. objDataPlaceholder = "{{ObjectData}}"
  55. objDataPlaceholderString = "{{ObjectDataString}}"
  56. )
  57. // Supported IDP login events
  58. const (
  59. IDPLoginUser = "IDP login user"
  60. IDPLoginAdmin = "IDP login admin"
  61. )
  62. var (
  63. // eventManager handle the supported event rules actions
  64. eventManager eventRulesContainer
  65. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  66. )
  67. func init() {
  68. eventManager = eventRulesContainer{
  69. schedulesMapping: make(map[string][]cron.EntryID),
  70. // arbitrary maximum number of concurrent asynchronous tasks,
  71. // each task could execute multiple actions
  72. concurrencyGuard: make(chan struct{}, 200),
  73. }
  74. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  75. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  76. p := EventParams{
  77. Name: executor,
  78. ObjectName: objectName,
  79. Event: operation,
  80. Status: 1,
  81. ObjectType: objectType,
  82. IP: ip,
  83. Role: role,
  84. Timestamp: time.Now().UnixNano(),
  85. Object: object,
  86. }
  87. if u, ok := object.(*dataprovider.User); ok {
  88. p.Email = u.Email
  89. } else if a, ok := object.(*dataprovider.Admin); ok {
  90. p.Email = a.Email
  91. }
  92. eventManager.handleProviderEvent(p)
  93. })
  94. }
  95. // HandleCertificateEvent checks and executes action rules for certificate events
  96. func HandleCertificateEvent(params EventParams) {
  97. eventManager.handleCertificateEvent(params)
  98. }
  99. // HandleIDPLoginEvent executes actions defined for a successful login from an Identity Provider
  100. func HandleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User, *dataprovider.Admin, error) {
  101. return eventManager.handleIDPLoginEvent(params, customFields)
  102. }
  103. // eventRulesContainer stores event rules by trigger
  104. type eventRulesContainer struct {
  105. sync.RWMutex
  106. lastLoad atomic.Int64
  107. FsEvents []dataprovider.EventRule
  108. ProviderEvents []dataprovider.EventRule
  109. Schedules []dataprovider.EventRule
  110. IPBlockedEvents []dataprovider.EventRule
  111. CertificateEvents []dataprovider.EventRule
  112. IPDLoginEvents []dataprovider.EventRule
  113. schedulesMapping map[string][]cron.EntryID
  114. concurrencyGuard chan struct{}
  115. }
  116. func (r *eventRulesContainer) addAsyncTask() {
  117. activeHooks.Add(1)
  118. r.concurrencyGuard <- struct{}{}
  119. }
  120. func (r *eventRulesContainer) removeAsyncTask() {
  121. activeHooks.Add(-1)
  122. <-r.concurrencyGuard
  123. }
  124. func (r *eventRulesContainer) getLastLoadTime() int64 {
  125. return r.lastLoad.Load()
  126. }
  127. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  128. r.lastLoad.Store(modTime)
  129. }
  130. // RemoveRule deletes the rule with the specified name
  131. func (r *eventRulesContainer) RemoveRule(name string) {
  132. r.Lock()
  133. defer r.Unlock()
  134. r.removeRuleInternal(name)
  135. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  136. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  137. }
  138. func (r *eventRulesContainer) removeRuleInternal(name string) {
  139. for idx := range r.FsEvents {
  140. if r.FsEvents[idx].Name == name {
  141. lastIdx := len(r.FsEvents) - 1
  142. r.FsEvents[idx] = r.FsEvents[lastIdx]
  143. r.FsEvents = r.FsEvents[:lastIdx]
  144. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  145. return
  146. }
  147. }
  148. for idx := range r.ProviderEvents {
  149. if r.ProviderEvents[idx].Name == name {
  150. lastIdx := len(r.ProviderEvents) - 1
  151. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  152. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  153. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  154. return
  155. }
  156. }
  157. for idx := range r.IPBlockedEvents {
  158. if r.IPBlockedEvents[idx].Name == name {
  159. lastIdx := len(r.IPBlockedEvents) - 1
  160. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  161. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  162. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  163. return
  164. }
  165. }
  166. for idx := range r.CertificateEvents {
  167. if r.CertificateEvents[idx].Name == name {
  168. lastIdx := len(r.CertificateEvents) - 1
  169. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  170. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  171. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  172. return
  173. }
  174. }
  175. for idx := range r.IPDLoginEvents {
  176. if r.IPDLoginEvents[idx].Name == name {
  177. lastIdx := len(r.IPDLoginEvents) - 1
  178. r.IPDLoginEvents[idx] = r.IPDLoginEvents[lastIdx]
  179. r.IPDLoginEvents = r.IPDLoginEvents[:lastIdx]
  180. eventManagerLog(logger.LevelDebug, "removed rule %q from IDP login events", name)
  181. return
  182. }
  183. }
  184. for idx := range r.Schedules {
  185. if r.Schedules[idx].Name == name {
  186. if schedules, ok := r.schedulesMapping[name]; ok {
  187. for _, entryID := range schedules {
  188. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  189. eventScheduler.Remove(entryID)
  190. }
  191. delete(r.schedulesMapping, name)
  192. }
  193. lastIdx := len(r.Schedules) - 1
  194. r.Schedules[idx] = r.Schedules[lastIdx]
  195. r.Schedules = r.Schedules[:lastIdx]
  196. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  197. return
  198. }
  199. }
  200. }
  201. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  202. r.removeRuleInternal(rule.Name)
  203. if rule.DeletedAt > 0 {
  204. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  205. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  206. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  207. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  208. }
  209. return
  210. }
  211. if rule.Status != 1 || rule.Trigger == dataprovider.EventTriggerOnDemand {
  212. return
  213. }
  214. switch rule.Trigger {
  215. case dataprovider.EventTriggerFsEvent:
  216. r.FsEvents = append(r.FsEvents, rule)
  217. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  218. case dataprovider.EventTriggerProviderEvent:
  219. r.ProviderEvents = append(r.ProviderEvents, rule)
  220. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  221. case dataprovider.EventTriggerIPBlocked:
  222. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  223. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  224. case dataprovider.EventTriggerCertificate:
  225. r.CertificateEvents = append(r.CertificateEvents, rule)
  226. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  227. case dataprovider.EventTriggerIDPLogin:
  228. r.IPDLoginEvents = append(r.IPDLoginEvents, rule)
  229. eventManagerLog(logger.LevelDebug, "added rule %q to IDP login events", rule.Name)
  230. case dataprovider.EventTriggerSchedule:
  231. for _, schedule := range rule.Conditions.Schedules {
  232. cronSpec := schedule.GetCronSpec()
  233. job := &eventCronJob{
  234. ruleName: dataprovider.ConvertName(rule.Name),
  235. }
  236. entryID, err := eventScheduler.AddJob(cronSpec, job)
  237. if err != nil {
  238. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  239. return
  240. }
  241. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  242. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  243. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  244. }
  245. r.Schedules = append(r.Schedules, rule)
  246. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  247. default:
  248. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  249. }
  250. }
  251. func (r *eventRulesContainer) loadRules() {
  252. eventManagerLog(logger.LevelDebug, "loading updated rules")
  253. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  254. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  255. if err != nil {
  256. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  257. return
  258. }
  259. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  260. if len(rules) > 0 {
  261. r.Lock()
  262. defer r.Unlock()
  263. for _, rule := range rules {
  264. r.addUpdateRuleInternal(rule)
  265. }
  266. }
  267. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d, IDP login events: %d",
  268. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents), len(r.IPDLoginEvents))
  269. r.setLastLoadTime(modTime)
  270. }
  271. func (*eventRulesContainer) checkIPDLoginEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  272. switch conditions.IDPLoginEvent {
  273. case dataprovider.IDPLoginUser:
  274. if params.Event != IDPLoginUser {
  275. return false
  276. }
  277. case dataprovider.IDPLoginAdmin:
  278. if params.Event != IDPLoginAdmin {
  279. return false
  280. }
  281. }
  282. return checkEventConditionPatterns(params.Name, conditions.Options.Names)
  283. }
  284. func (*eventRulesContainer) checkProviderEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  285. if !slices.Contains(conditions.ProviderEvents, params.Event) {
  286. return false
  287. }
  288. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  289. return false
  290. }
  291. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  292. return false
  293. }
  294. if len(conditions.Options.ProviderObjects) > 0 && !slices.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  295. return false
  296. }
  297. return true
  298. }
  299. func (*eventRulesContainer) checkFsEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  300. if !slices.Contains(conditions.FsEvents, params.Event) {
  301. return false
  302. }
  303. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  304. return false
  305. }
  306. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  307. return false
  308. }
  309. if !checkEventGroupConditionPatterns(params.Groups, conditions.Options.GroupNames) {
  310. return false
  311. }
  312. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  313. return false
  314. }
  315. if len(conditions.Options.Protocols) > 0 && !slices.Contains(conditions.Options.Protocols, params.Protocol) {
  316. return false
  317. }
  318. if params.Event == operationUpload || params.Event == operationDownload {
  319. if conditions.Options.MinFileSize > 0 {
  320. if params.FileSize < conditions.Options.MinFileSize {
  321. return false
  322. }
  323. }
  324. if conditions.Options.MaxFileSize > 0 {
  325. if params.FileSize > conditions.Options.MaxFileSize {
  326. return false
  327. }
  328. }
  329. }
  330. return true
  331. }
  332. // hasFsRules returns true if there are any rules for filesystem event triggers
  333. func (r *eventRulesContainer) hasFsRules() bool {
  334. r.RLock()
  335. defer r.RUnlock()
  336. return len(r.FsEvents) > 0
  337. }
  338. // handleFsEvent executes the rules actions defined for the specified event.
  339. // The boolean parameter indicates whether a sync action was executed
  340. func (r *eventRulesContainer) handleFsEvent(params EventParams) (bool, error) {
  341. if params.Protocol == protocolEventAction {
  342. return false, nil
  343. }
  344. r.RLock()
  345. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  346. for _, rule := range r.FsEvents {
  347. if r.checkFsEventMatch(&rule.Conditions, &params) {
  348. if err := rule.CheckActionsConsistency(""); err != nil {
  349. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  350. rule.Name, err, params.Event)
  351. continue
  352. }
  353. hasSyncActions := false
  354. for _, action := range rule.Actions {
  355. if action.Options.ExecuteSync {
  356. hasSyncActions = true
  357. break
  358. }
  359. }
  360. if hasSyncActions {
  361. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  362. } else {
  363. rulesAsync = append(rulesAsync, rule)
  364. }
  365. }
  366. }
  367. r.RUnlock()
  368. params.sender = params.Name
  369. params.addUID()
  370. if len(rulesAsync) > 0 {
  371. go executeAsyncRulesActions(rulesAsync, params)
  372. }
  373. if len(rulesWithSyncActions) > 0 {
  374. return true, executeSyncRulesActions(rulesWithSyncActions, params)
  375. }
  376. return false, nil
  377. }
  378. func (r *eventRulesContainer) handleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User,
  379. *dataprovider.Admin, error,
  380. ) {
  381. r.RLock()
  382. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  383. for _, rule := range r.IPDLoginEvents {
  384. if r.checkIPDLoginEventMatch(&rule.Conditions, &params) {
  385. if err := rule.CheckActionsConsistency(""); err != nil {
  386. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  387. rule.Name, err, params.Event)
  388. continue
  389. }
  390. hasSyncActions := false
  391. for _, action := range rule.Actions {
  392. if action.Options.ExecuteSync {
  393. hasSyncActions = true
  394. break
  395. }
  396. }
  397. if hasSyncActions {
  398. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  399. } else {
  400. rulesAsync = append(rulesAsync, rule)
  401. }
  402. }
  403. }
  404. r.RUnlock()
  405. if len(rulesAsync) == 0 && len(rulesWithSyncActions) == 0 {
  406. return nil, nil, nil
  407. }
  408. params.addIDPCustomFields(customFields)
  409. if len(rulesWithSyncActions) > 1 {
  410. var ruleNames []string
  411. for _, r := range rulesWithSyncActions {
  412. ruleNames = append(ruleNames, r.Name)
  413. }
  414. return nil, nil, fmt.Errorf("more than one account check action rules matches: %q", strings.Join(ruleNames, ","))
  415. }
  416. params.addUID()
  417. if len(rulesAsync) > 0 {
  418. go executeAsyncRulesActions(rulesAsync, params)
  419. }
  420. if len(rulesWithSyncActions) > 0 {
  421. return executeIDPAccountCheckRule(rulesWithSyncActions[0], params)
  422. }
  423. return nil, nil, nil
  424. }
  425. // username is populated for user objects
  426. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  427. r.RLock()
  428. defer r.RUnlock()
  429. var rules []dataprovider.EventRule
  430. for _, rule := range r.ProviderEvents {
  431. if r.checkProviderEventMatch(&rule.Conditions, &params) {
  432. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  433. rules = append(rules, rule)
  434. } else {
  435. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  436. rule.Name, err, params.Event, params.ObjectType)
  437. }
  438. }
  439. }
  440. if len(rules) > 0 {
  441. params.sender = params.ObjectName
  442. go executeAsyncRulesActions(rules, params)
  443. }
  444. }
  445. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  446. r.RLock()
  447. defer r.RUnlock()
  448. if len(r.IPBlockedEvents) == 0 {
  449. return
  450. }
  451. var rules []dataprovider.EventRule
  452. for _, rule := range r.IPBlockedEvents {
  453. if err := rule.CheckActionsConsistency(""); err == nil {
  454. rules = append(rules, rule)
  455. } else {
  456. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  457. rule.Name, err, params.Event)
  458. }
  459. }
  460. if len(rules) > 0 {
  461. go executeAsyncRulesActions(rules, params)
  462. }
  463. }
  464. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  465. r.RLock()
  466. defer r.RUnlock()
  467. if len(r.CertificateEvents) == 0 {
  468. return
  469. }
  470. var rules []dataprovider.EventRule
  471. for _, rule := range r.CertificateEvents {
  472. if err := rule.CheckActionsConsistency(""); err == nil {
  473. rules = append(rules, rule)
  474. } else {
  475. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  476. rule.Name, err, params.Event)
  477. }
  478. }
  479. if len(rules) > 0 {
  480. go executeAsyncRulesActions(rules, params)
  481. }
  482. }
  483. type executedRetentionCheck struct {
  484. Username string
  485. ActionName string
  486. Results []folderRetentionCheckResult
  487. }
  488. // EventParams defines the supported event parameters
  489. type EventParams struct {
  490. Name string
  491. Groups []sdk.GroupMapping
  492. Event string
  493. Status int
  494. VirtualPath string
  495. FsPath string
  496. VirtualTargetPath string
  497. FsTargetPath string
  498. ObjectName string
  499. Extension string
  500. ObjectType string
  501. FileSize int64
  502. Elapsed int64
  503. Protocol string
  504. IP string
  505. Role string
  506. Email string
  507. Timestamp int64
  508. UID string
  509. IDPCustomFields *map[string]string
  510. Object plugin.Renderer
  511. Metadata map[string]string
  512. sender string
  513. updateStatusFromError bool
  514. errors []string
  515. retentionChecks []executedRetentionCheck
  516. }
  517. func (p *EventParams) getACopy() *EventParams {
  518. params := *p
  519. params.errors = make([]string, len(p.errors))
  520. copy(params.errors, p.errors)
  521. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  522. for _, c := range p.retentionChecks {
  523. executedCheck := executedRetentionCheck{
  524. Username: c.Username,
  525. ActionName: c.ActionName,
  526. }
  527. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  528. copy(executedCheck.Results, c.Results)
  529. retentionChecks = append(retentionChecks, executedCheck)
  530. }
  531. params.retentionChecks = retentionChecks
  532. if p.IDPCustomFields != nil {
  533. fields := make(map[string]string)
  534. for k, v := range *p.IDPCustomFields {
  535. fields[k] = v
  536. }
  537. params.IDPCustomFields = &fields
  538. }
  539. if len(params.Metadata) > 0 {
  540. metadata := make(map[string]string)
  541. for k, v := range p.Metadata {
  542. metadata[k] = v
  543. }
  544. params.Metadata = metadata
  545. }
  546. return &params
  547. }
  548. func (p *EventParams) addIDPCustomFields(customFields *map[string]any) {
  549. if customFields == nil || len(*customFields) == 0 {
  550. return
  551. }
  552. fields := make(map[string]string)
  553. for k, v := range *customFields {
  554. switch val := v.(type) {
  555. case string:
  556. fields[k] = val
  557. }
  558. }
  559. p.IDPCustomFields = &fields
  560. }
  561. // AddError adds a new error to the event params and update the status if needed
  562. func (p *EventParams) AddError(err error) {
  563. if err == nil {
  564. return
  565. }
  566. if p.updateStatusFromError && p.Status == 1 {
  567. p.Status = 2
  568. }
  569. p.errors = append(p.errors, err.Error())
  570. }
  571. func (p *EventParams) addUID() {
  572. if p.UID == "" {
  573. p.UID = util.GenerateUniqueID()
  574. }
  575. }
  576. func (p *EventParams) setBackupParams(backupPath string) {
  577. if p.sender != "" {
  578. return
  579. }
  580. p.sender = dataprovider.ActionExecutorSystem
  581. p.FsPath = backupPath
  582. p.ObjectName = filepath.Base(backupPath)
  583. p.VirtualPath = "/" + p.ObjectName
  584. p.Timestamp = time.Now().UnixNano()
  585. info, err := os.Stat(backupPath)
  586. if err == nil {
  587. p.FileSize = info.Size()
  588. }
  589. }
  590. func (p *EventParams) getStatusString() string {
  591. switch p.Status {
  592. case 1:
  593. return "OK"
  594. default:
  595. return "KO"
  596. }
  597. }
  598. // getUsers returns users with group settings not applied
  599. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  600. if p.sender == "" {
  601. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeUsers})
  602. if err != nil {
  603. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  604. return nil, errors.New("unable to get users")
  605. }
  606. return dump.Users, nil
  607. }
  608. user, err := p.getUserFromSender()
  609. if err != nil {
  610. return nil, err
  611. }
  612. return []dataprovider.User{user}, nil
  613. }
  614. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  615. if p.sender == dataprovider.ActionExecutorSystem {
  616. return dataprovider.User{
  617. BaseUser: sdk.BaseUser{
  618. Status: 1,
  619. Username: p.sender,
  620. HomeDir: dataprovider.GetBackupsPath(),
  621. Permissions: map[string][]string{
  622. "/": {dataprovider.PermAny},
  623. },
  624. },
  625. }, nil
  626. }
  627. user, err := dataprovider.UserExists(p.sender, "")
  628. if err != nil {
  629. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  630. return user, fmt.Errorf("error getting user %q", p.sender)
  631. }
  632. return user, nil
  633. }
  634. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  635. if p.sender == "" {
  636. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeFolders})
  637. return dump.Folders, err
  638. }
  639. folder, err := dataprovider.GetFolderByName(p.sender)
  640. if err != nil {
  641. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  642. }
  643. return []vfs.BaseVirtualFolder{folder}, nil
  644. }
  645. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  646. if len(p.retentionChecks) == 0 {
  647. return nil, errors.New("no data retention report available")
  648. }
  649. var b bytes.Buffer
  650. if _, err := p.writeCompressedDataRetentionReports(&b); err != nil {
  651. return nil, err
  652. }
  653. return b.Bytes(), nil
  654. }
  655. func (p *EventParams) writeCompressedDataRetentionReports(w io.Writer) (int64, error) {
  656. var n int64
  657. wr := zip.NewWriter(w)
  658. for _, check := range p.retentionChecks {
  659. data, err := getCSVRetentionReport(check.Results)
  660. if err != nil {
  661. return n, fmt.Errorf("unable to get CSV report: %w", err)
  662. }
  663. dataSize := int64(len(data))
  664. n += dataSize
  665. // we suppose a 3:1 compression ratio
  666. if n > (maxAttachmentsSize * 3) {
  667. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s",
  668. util.ByteCountIEC(n))
  669. return n, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(n))
  670. }
  671. fh := &zip.FileHeader{
  672. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  673. Method: zip.Deflate,
  674. Modified: time.Now().UTC(),
  675. }
  676. f, err := wr.CreateHeader(fh)
  677. if err != nil {
  678. return n, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  679. }
  680. _, err = io.CopyN(f, bytes.NewBuffer(data), dataSize)
  681. if err != nil {
  682. return n, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  683. }
  684. }
  685. if err := wr.Close(); err != nil {
  686. return n, fmt.Errorf("unable to close zip writer: %w", err)
  687. }
  688. return n, nil
  689. }
  690. func (p *EventParams) getRetentionReportsAsMailAttachment() (*mail.File, error) {
  691. if len(p.retentionChecks) == 0 {
  692. return nil, errors.New("no data retention report available")
  693. }
  694. return &mail.File{
  695. Name: "retention-reports.zip",
  696. Header: make(map[string][]string),
  697. Writer: p.writeCompressedDataRetentionReports,
  698. }, nil
  699. }
  700. func (*EventParams) getStringReplacement(val string, jsonEscaped bool) string {
  701. if jsonEscaped {
  702. return util.JSONEscape(val)
  703. }
  704. return val
  705. }
  706. func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []string {
  707. replacements := []string{
  708. "{{Name}}", p.getStringReplacement(p.Name, jsonEscaped),
  709. "{{Event}}", p.Event,
  710. "{{Status}}", fmt.Sprintf("%d", p.Status),
  711. "{{VirtualPath}}", p.getStringReplacement(p.VirtualPath, jsonEscaped),
  712. "{{FsPath}}", p.getStringReplacement(p.FsPath, jsonEscaped),
  713. "{{VirtualTargetPath}}", p.getStringReplacement(p.VirtualTargetPath, jsonEscaped),
  714. "{{FsTargetPath}}", p.getStringReplacement(p.FsTargetPath, jsonEscaped),
  715. "{{ObjectName}}", p.getStringReplacement(p.ObjectName, jsonEscaped),
  716. "{{ObjectType}}", p.ObjectType,
  717. "{{FileSize}}", strconv.FormatInt(p.FileSize, 10),
  718. "{{Elapsed}}", strconv.FormatInt(p.Elapsed, 10),
  719. "{{Protocol}}", p.Protocol,
  720. "{{IP}}", p.IP,
  721. "{{Role}}", p.getStringReplacement(p.Role, jsonEscaped),
  722. "{{Email}}", p.getStringReplacement(p.Email, jsonEscaped),
  723. "{{Timestamp}}", strconv.FormatInt(p.Timestamp, 10),
  724. "{{StatusString}}", p.getStatusString(),
  725. "{{UID}}", p.getStringReplacement(p.UID, jsonEscaped),
  726. "{{Ext}}", p.getStringReplacement(p.Extension, jsonEscaped),
  727. }
  728. if p.VirtualPath != "" {
  729. replacements = append(replacements, "{{VirtualDirPath}}", p.getStringReplacement(path.Dir(p.VirtualPath), jsonEscaped))
  730. }
  731. if p.VirtualTargetPath != "" {
  732. replacements = append(replacements, "{{VirtualTargetDirPath}}", p.getStringReplacement(path.Dir(p.VirtualTargetPath), jsonEscaped))
  733. replacements = append(replacements, "{{TargetName}}", p.getStringReplacement(path.Base(p.VirtualTargetPath), jsonEscaped))
  734. }
  735. if len(p.errors) > 0 {
  736. replacements = append(replacements, "{{ErrorString}}", p.getStringReplacement(strings.Join(p.errors, ", "), jsonEscaped))
  737. } else {
  738. replacements = append(replacements, "{{ErrorString}}", "")
  739. }
  740. replacements = append(replacements, objDataPlaceholder, "{}")
  741. replacements = append(replacements, objDataPlaceholderString, "")
  742. if addObjectData {
  743. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  744. if err == nil {
  745. dataString := util.BytesToString(data)
  746. replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
  747. replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
  748. }
  749. }
  750. if p.IDPCustomFields != nil {
  751. for k, v := range *p.IDPCustomFields {
  752. replacements = append(replacements, fmt.Sprintf("{{IDPField%s}}", k), p.getStringReplacement(v, jsonEscaped))
  753. }
  754. }
  755. replacements = append(replacements, "{{Metadata}}", "{}")
  756. replacements = append(replacements, "{{MetadataString}}", "")
  757. if len(p.Metadata) > 0 {
  758. data, err := json.Marshal(p.Metadata)
  759. if err == nil {
  760. dataString := util.BytesToString(data)
  761. replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
  762. replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
  763. }
  764. }
  765. return replacements
  766. }
  767. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  768. var b bytes.Buffer
  769. csvWriter := csv.NewWriter(&b)
  770. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  771. "elapsed (ms)", "info", "error"})
  772. if err != nil {
  773. return nil, err
  774. }
  775. for _, result := range results {
  776. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  777. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  778. result.Info, result.Error})
  779. if err != nil {
  780. return nil, err
  781. }
  782. }
  783. csvWriter.Flush()
  784. err = csvWriter.Error()
  785. return b.Bytes(), err
  786. }
  787. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSourcePath, virtualTargetPath string,
  788. numFiles int, truncatedSize int64, errTransfer error, operation string, startTime time.Time,
  789. ) error {
  790. var fsDstPath string
  791. var errDstFs error
  792. errWrite := w.Close()
  793. targetPath := virtualSourcePath
  794. if virtualTargetPath != "" {
  795. targetPath = virtualTargetPath
  796. var fsDst vfs.Fs
  797. fsDst, fsDstPath, errDstFs = conn.GetFsAndResolvedPath(virtualTargetPath)
  798. if errTransfer != nil && errDstFs == nil {
  799. // try to remove a partial file on error. If this fails, we can't do anything
  800. errRemove := fsDst.Remove(fsDstPath, false)
  801. conn.Log(logger.LevelDebug, "removing partial file %q after write error, result: %v", virtualTargetPath, errRemove)
  802. }
  803. }
  804. info, err := conn.doStatInternal(targetPath, 0, false, false)
  805. if err == nil {
  806. updateUserQuotaAfterFileWrite(conn, targetPath, numFiles, info.Size()-truncatedSize)
  807. var fsSrcPath string
  808. var errSrcFs error
  809. if virtualSourcePath != "" {
  810. _, fsSrcPath, errSrcFs = conn.GetFsAndResolvedPath(virtualSourcePath)
  811. }
  812. if errSrcFs == nil && errDstFs == nil {
  813. elapsed := time.Since(startTime).Nanoseconds() / 1000000
  814. if errTransfer == nil {
  815. errTransfer = errWrite
  816. }
  817. if operation == operationCopy {
  818. logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
  819. "", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed)
  820. }
  821. ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed, nil) //nolint:errcheck
  822. }
  823. } else {
  824. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
  825. }
  826. if errTransfer != nil {
  827. return errTransfer
  828. }
  829. return errWrite
  830. }
  831. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  832. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  833. if err != nil {
  834. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  835. return
  836. }
  837. dataprovider.UpdateUserFolderQuota(&vfolder, &conn.User, numFiles, fileSize, false)
  838. }
  839. func checkWriterPermsAndQuota(conn *BaseConnection, virtualPath string, numFiles int, expectedSize, truncatedSize int64) error {
  840. if numFiles == 0 {
  841. if !conn.User.HasPerm(dataprovider.PermOverwrite, path.Dir(virtualPath)) {
  842. return conn.GetPermissionDeniedError()
  843. }
  844. } else {
  845. if !conn.User.HasPerm(dataprovider.PermUpload, path.Dir(virtualPath)) {
  846. return conn.GetPermissionDeniedError()
  847. }
  848. }
  849. q, _ := conn.HasSpace(numFiles > 0, false, virtualPath)
  850. if !q.HasSpace {
  851. return conn.GetQuotaExceededError()
  852. }
  853. if expectedSize != -1 {
  854. sizeDiff := expectedSize - truncatedSize
  855. if sizeDiff > 0 {
  856. remainingSize := q.GetRemainingSize()
  857. if remainingSize > 0 && remainingSize < sizeDiff {
  858. return conn.GetQuotaExceededError()
  859. }
  860. }
  861. }
  862. return nil
  863. }
  864. func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64) (io.WriteCloser, int, int64, func(), error) {
  865. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  866. if err != nil {
  867. return nil, 0, 0, nil, err
  868. }
  869. var truncatedSize, fileSize int64
  870. numFiles := 1
  871. isFileOverwrite := false
  872. info, err := fs.Lstat(fsPath)
  873. if err == nil {
  874. fileSize = info.Size()
  875. if info.IsDir() {
  876. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  877. }
  878. if info.Mode().IsRegular() {
  879. isFileOverwrite = true
  880. truncatedSize = fileSize
  881. }
  882. numFiles = 0
  883. }
  884. if err != nil && !fs.IsNotExist(err) {
  885. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  886. }
  887. if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
  888. return nil, numFiles, truncatedSize, nil, err
  889. }
  890. f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1, false))
  891. if err != nil {
  892. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  893. }
  894. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  895. if isFileOverwrite {
  896. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  897. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  898. truncatedSize = 0
  899. }
  900. }
  901. if cancelFn == nil {
  902. cancelFn = func() {}
  903. }
  904. if f != nil {
  905. return f, numFiles, truncatedSize, cancelFn, nil
  906. }
  907. return w, numFiles, truncatedSize, cancelFn, nil
  908. }
  909. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string, recursion int) error {
  910. if entryPath == wr.Name {
  911. // skip the archive itself
  912. return nil
  913. }
  914. if recursion >= util.MaxRecursion {
  915. eventManagerLog(logger.LevelError, "unable to add zip entry %q, recursion too deep: %v", entryPath, recursion)
  916. return util.ErrRecursionTooDeep
  917. }
  918. recursion++
  919. info, err := conn.DoStat(entryPath, 1, false)
  920. if err != nil {
  921. eventManagerLog(logger.LevelError, "unable to add zip entry %q, stat error: %v", entryPath, err)
  922. return err
  923. }
  924. entryName, err := getZipEntryName(entryPath, baseDir)
  925. if err != nil {
  926. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  927. return err
  928. }
  929. if _, ok := wr.Entries[entryName]; ok {
  930. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  931. return nil
  932. }
  933. wr.Entries[entryName] = true
  934. if info.IsDir() {
  935. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  936. Name: entryName + "/",
  937. Method: zip.Deflate,
  938. Modified: info.ModTime(),
  939. })
  940. if err != nil {
  941. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  942. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  943. }
  944. lister, err := conn.ListDir(entryPath)
  945. if err != nil {
  946. eventManagerLog(logger.LevelError, "unable to add zip entry %q, get dir lister error: %v", entryPath, err)
  947. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  948. }
  949. defer lister.Close()
  950. for {
  951. contents, err := lister.Next(vfs.ListerBatchSize)
  952. finished := errors.Is(err, io.EOF)
  953. if err := lister.convertError(err); err != nil {
  954. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  955. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  956. }
  957. for _, info := range contents {
  958. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  959. if err := addZipEntry(wr, conn, fullPath, baseDir, recursion); err != nil {
  960. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  961. return err
  962. }
  963. }
  964. if finished {
  965. return nil
  966. }
  967. }
  968. }
  969. if !info.Mode().IsRegular() {
  970. // we only allow regular files
  971. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  972. return nil
  973. }
  974. return addFileToZip(wr, conn, entryPath, entryName, info.ModTime())
  975. }
  976. func addFileToZip(wr *zipWriterWrapper, conn *BaseConnection, entryPath, entryName string, modTime time.Time) error {
  977. reader, cancelFn, err := getFileReader(conn, entryPath)
  978. if err != nil {
  979. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  980. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  981. }
  982. defer cancelFn()
  983. defer reader.Close()
  984. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  985. Name: entryName,
  986. Method: zip.Deflate,
  987. Modified: modTime,
  988. })
  989. if err != nil {
  990. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  991. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  992. }
  993. _, err = io.Copy(f, reader)
  994. return err
  995. }
  996. func getZipEntryName(entryPath, baseDir string) (string, error) {
  997. if !strings.HasPrefix(entryPath, baseDir) {
  998. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  999. }
  1000. entryPath = strings.TrimPrefix(entryPath, baseDir)
  1001. return strings.TrimPrefix(entryPath, "/"), nil
  1002. }
  1003. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  1004. if !conn.User.HasPerm(dataprovider.PermDownload, path.Dir(virtualPath)) {
  1005. return nil, nil, conn.GetPermissionDeniedError()
  1006. }
  1007. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  1008. if err != nil {
  1009. return nil, nil, err
  1010. }
  1011. f, r, cancelFn, err := fs.Open(fsPath, 0)
  1012. if err != nil {
  1013. return nil, nil, conn.GetFsError(fs, err)
  1014. }
  1015. if cancelFn == nil {
  1016. cancelFn = func() {}
  1017. }
  1018. if f != nil {
  1019. return f, cancelFn, nil
  1020. }
  1021. return r, cancelFn, nil
  1022. }
  1023. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  1024. reader, cancelFn, err := getFileReader(conn, virtualPath)
  1025. if err != nil {
  1026. return err
  1027. }
  1028. defer cancelFn()
  1029. defer reader.Close()
  1030. _, err = io.Copy(w, reader)
  1031. return err
  1032. }
  1033. func getFileContentFn(conn *BaseConnection, virtualPath string, size int64) func(w io.Writer) (int64, error) {
  1034. return func(w io.Writer) (int64, error) {
  1035. reader, cancelFn, err := getFileReader(conn, virtualPath)
  1036. if err != nil {
  1037. return 0, err
  1038. }
  1039. defer cancelFn()
  1040. defer reader.Close()
  1041. return io.CopyN(w, reader, size)
  1042. }
  1043. }
  1044. func getMailAttachments(conn *BaseConnection, attachments []string, replacer *strings.Replacer) ([]*mail.File, error) {
  1045. var files []*mail.File
  1046. totalSize := int64(0)
  1047. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  1048. info, err := conn.DoStat(virtualPath, 0, false)
  1049. if err != nil {
  1050. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  1051. }
  1052. if !info.Mode().IsRegular() {
  1053. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  1054. }
  1055. totalSize += info.Size()
  1056. if totalSize > maxAttachmentsSize {
  1057. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  1058. }
  1059. files = append(files, &mail.File{
  1060. Name: path.Base(virtualPath),
  1061. Header: make(map[string][]string),
  1062. Writer: getFileContentFn(conn, virtualPath, info.Size()),
  1063. })
  1064. }
  1065. return files, nil
  1066. }
  1067. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  1068. if !strings.Contains(input, "{{") {
  1069. return input
  1070. }
  1071. return replacer.Replace(input)
  1072. }
  1073. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  1074. var matched bool
  1075. var err error
  1076. if strings.Contains(p.Pattern, "**") {
  1077. matched, err = doublestar.Match(p.Pattern, name)
  1078. } else {
  1079. matched, err = path.Match(p.Pattern, name)
  1080. }
  1081. if err != nil {
  1082. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  1083. return false
  1084. }
  1085. if p.InverseMatch {
  1086. return !matched
  1087. }
  1088. return matched
  1089. }
  1090. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  1091. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1092. return false
  1093. }
  1094. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  1095. return false
  1096. }
  1097. if !checkEventGroupConditionPatterns(user.Groups, conditions.GroupNames) {
  1098. return false
  1099. }
  1100. return true
  1101. }
  1102. // checkEventConditionPatterns returns false if patterns are defined and no match is found
  1103. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  1104. if len(patterns) == 0 {
  1105. return true
  1106. }
  1107. matches := false
  1108. for _, p := range patterns {
  1109. // assume, that multiple InverseMatches are set
  1110. if p.InverseMatch {
  1111. if checkEventConditionPattern(p, name) {
  1112. matches = true
  1113. } else {
  1114. return false
  1115. }
  1116. } else if checkEventConditionPattern(p, name) {
  1117. return true
  1118. }
  1119. }
  1120. return matches
  1121. }
  1122. func checkEventGroupConditionPatterns(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  1123. if len(patterns) == 0 {
  1124. return true
  1125. }
  1126. matches := false
  1127. for _, group := range groups {
  1128. for _, p := range patterns {
  1129. // assume, that multiple InverseMatches are set
  1130. if p.InverseMatch {
  1131. if checkEventConditionPattern(p, group.Name) {
  1132. matches = true
  1133. } else {
  1134. return false
  1135. }
  1136. } else {
  1137. if checkEventConditionPattern(p, group.Name) {
  1138. return true
  1139. }
  1140. }
  1141. }
  1142. }
  1143. return matches
  1144. }
  1145. func getHTTPRuleActionEndpoint(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  1146. u, err := url.Parse(c.Endpoint)
  1147. if err != nil {
  1148. return "", fmt.Errorf("invalid endpoint: %w", err)
  1149. }
  1150. if strings.Contains(u.Path, "{{") {
  1151. pathComponents := strings.Split(u.Path, "/")
  1152. for idx := range pathComponents {
  1153. part := replaceWithReplacer(pathComponents[idx], replacer)
  1154. if part != pathComponents[idx] {
  1155. pathComponents[idx] = url.PathEscape(part)
  1156. }
  1157. }
  1158. u.Path = ""
  1159. u = u.JoinPath(pathComponents...)
  1160. }
  1161. if len(c.QueryParameters) > 0 {
  1162. q := u.Query()
  1163. for _, keyVal := range c.QueryParameters {
  1164. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1165. }
  1166. u.RawQuery = q.Encode()
  1167. }
  1168. return u.String(), nil
  1169. }
  1170. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  1171. conn *BaseConnection, replacer *strings.Replacer, params *EventParams, addObjectData bool,
  1172. ) error {
  1173. partWriter, err := m.CreatePart(h)
  1174. if err != nil {
  1175. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  1176. return err
  1177. }
  1178. if part.Body != "" {
  1179. cType := h.Get("Content-Type")
  1180. if strings.Contains(strings.ToLower(cType), "application/json") {
  1181. replacements := params.getStringReplacements(addObjectData, true)
  1182. jsonReplacer := strings.NewReplacer(replacements...)
  1183. _, err = partWriter.Write(util.StringToBytes(replaceWithReplacer(part.Body, jsonReplacer)))
  1184. } else {
  1185. _, err = partWriter.Write(util.StringToBytes(replaceWithReplacer(part.Body, replacer)))
  1186. }
  1187. if err != nil {
  1188. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1189. return err
  1190. }
  1191. return nil
  1192. }
  1193. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  1194. data, err := params.getCompressedDataRetentionReport()
  1195. if err != nil {
  1196. return err
  1197. }
  1198. _, err = partWriter.Write(data)
  1199. if err != nil {
  1200. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1201. return err
  1202. }
  1203. return nil
  1204. }
  1205. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  1206. if err != nil {
  1207. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  1208. return err
  1209. }
  1210. return nil
  1211. }
  1212. func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1213. cancel context.CancelFunc, user dataprovider.User, params *EventParams, addObjectData bool,
  1214. ) (io.Reader, string, error) {
  1215. var body io.Reader
  1216. if c.Method == http.MethodGet {
  1217. return body, "", nil
  1218. }
  1219. if c.Body != "" {
  1220. if c.Body == dataprovider.RetentionReportPlaceHolder {
  1221. data, err := params.getCompressedDataRetentionReport()
  1222. if err != nil {
  1223. return body, "", err
  1224. }
  1225. return bytes.NewBuffer(data), "", nil
  1226. }
  1227. if c.HasJSONBody() {
  1228. replacements := params.getStringReplacements(addObjectData, true)
  1229. jsonReplacer := strings.NewReplacer(replacements...)
  1230. return bytes.NewBufferString(replaceWithReplacer(c.Body, jsonReplacer)), "", nil
  1231. }
  1232. return bytes.NewBufferString(replaceWithReplacer(c.Body, replacer)), "", nil
  1233. }
  1234. if len(c.Parts) > 0 {
  1235. r, w := io.Pipe()
  1236. m := multipart.NewWriter(w)
  1237. var conn *BaseConnection
  1238. if user.Username != "" {
  1239. var err error
  1240. user, err = getUserForEventAction(user)
  1241. if err != nil {
  1242. return body, "", err
  1243. }
  1244. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1245. err = user.CheckFsRoot(connectionID)
  1246. if err != nil {
  1247. user.CloseFs() //nolint:errcheck
  1248. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  1249. user.Username, err)
  1250. }
  1251. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1252. }
  1253. go func() {
  1254. defer w.Close()
  1255. defer user.CloseFs() //nolint:errcheck
  1256. for _, part := range c.Parts {
  1257. h := make(textproto.MIMEHeader)
  1258. if part.Body != "" {
  1259. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  1260. } else {
  1261. h.Set("Content-Disposition",
  1262. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  1263. multipartQuoteEscaper.Replace(part.Name),
  1264. multipartQuoteEscaper.Replace((path.Base(replaceWithReplacer(part.Filepath, replacer))))))
  1265. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  1266. if contentType == "" {
  1267. contentType = "application/octet-stream"
  1268. }
  1269. h.Set("Content-Type", contentType)
  1270. }
  1271. for _, keyVal := range part.Headers {
  1272. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1273. }
  1274. if err := writeHTTPPart(m, part, h, conn, replacer, params, addObjectData); err != nil {
  1275. cancel()
  1276. return
  1277. }
  1278. }
  1279. m.Close()
  1280. }()
  1281. return r, m.FormDataContentType(), nil
  1282. }
  1283. return body, "", nil
  1284. }
  1285. func setHTTPReqHeaders(req *http.Request, c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1286. contentType string,
  1287. ) {
  1288. if contentType != "" {
  1289. req.Header.Set("Content-Type", contentType)
  1290. }
  1291. if c.Username != "" || c.Password.GetPayload() != "" {
  1292. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1293. }
  1294. for _, keyVal := range c.Headers {
  1295. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1296. }
  1297. }
  1298. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1299. if err := c.TryDecryptPassword(); err != nil {
  1300. return err
  1301. }
  1302. addObjectData := false
  1303. if params.Object != nil {
  1304. addObjectData = c.HasObjectData()
  1305. }
  1306. replacements := params.getStringReplacements(addObjectData, false)
  1307. replacer := strings.NewReplacer(replacements...)
  1308. endpoint, err := getHTTPRuleActionEndpoint(&c, replacer)
  1309. if err != nil {
  1310. return err
  1311. }
  1312. ctx, cancel := c.GetContext()
  1313. defer cancel()
  1314. var user dataprovider.User
  1315. if c.HasMultipartFiles() {
  1316. user, err = params.getUserFromSender()
  1317. if err != nil {
  1318. return err
  1319. }
  1320. }
  1321. body, contentType, err := getHTTPRuleActionBody(&c, replacer, cancel, user, params, addObjectData)
  1322. if err != nil {
  1323. return err
  1324. }
  1325. if body != nil {
  1326. rc, ok := body.(io.ReadCloser)
  1327. if ok {
  1328. defer rc.Close()
  1329. }
  1330. }
  1331. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1332. if err != nil {
  1333. return err
  1334. }
  1335. setHTTPReqHeaders(req, &c, replacer, contentType)
  1336. client := c.GetHTTPClient()
  1337. defer client.CloseIdleConnections()
  1338. startTime := time.Now()
  1339. resp, err := client.Do(req)
  1340. if err != nil {
  1341. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1342. endpoint, time.Since(startTime), err)
  1343. return fmt.Errorf("error sending HTTP request: %w", err)
  1344. }
  1345. defer resp.Body.Close()
  1346. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1347. endpoint, time.Since(startTime), resp.StatusCode)
  1348. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1349. if rb, err := io.ReadAll(io.LimitReader(resp.Body, 2048)); err == nil {
  1350. eventManagerLog(logger.LevelDebug, "error notification response from endpoint %q: %s",
  1351. endpoint, util.BytesToString(rb))
  1352. }
  1353. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1354. }
  1355. return nil
  1356. }
  1357. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1358. addObjectData := false
  1359. if params.Object != nil {
  1360. for _, k := range c.EnvVars {
  1361. if strings.Contains(k.Value, objDataPlaceholder) || strings.Contains(k.Value, objDataPlaceholderString) {
  1362. addObjectData = true
  1363. break
  1364. }
  1365. }
  1366. }
  1367. replacements := params.getStringReplacements(addObjectData, false)
  1368. replacer := strings.NewReplacer(replacements...)
  1369. args := make([]string, 0, len(c.Args))
  1370. for _, arg := range c.Args {
  1371. args = append(args, replaceWithReplacer(arg, replacer))
  1372. }
  1373. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1374. defer cancel()
  1375. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1376. cmd.Env = []string{}
  1377. for _, keyVal := range c.EnvVars {
  1378. if keyVal.Value == "$" {
  1379. val := os.Getenv(keyVal.Key)
  1380. if val == "" {
  1381. eventManagerLog(logger.LevelDebug, "empty value for environment variable %q", keyVal.Key)
  1382. }
  1383. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, val))
  1384. } else {
  1385. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1386. }
  1387. }
  1388. startTime := time.Now()
  1389. err := cmd.Run()
  1390. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1391. c.Cmd, time.Since(startTime), err)
  1392. return err
  1393. }
  1394. func getEmailAddressesWithReplacer(addrs []string, replacer *strings.Replacer) []string {
  1395. if len(addrs) == 0 {
  1396. return nil
  1397. }
  1398. recipients := make([]string, 0, len(addrs))
  1399. for _, recipient := range addrs {
  1400. rcpt := replaceWithReplacer(recipient, replacer)
  1401. if rcpt != "" {
  1402. recipients = append(recipients, rcpt)
  1403. }
  1404. }
  1405. return recipients
  1406. }
  1407. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1408. addObjectData := false
  1409. if params.Object != nil {
  1410. if strings.Contains(c.Body, objDataPlaceholder) || strings.Contains(c.Body, objDataPlaceholderString) {
  1411. addObjectData = true
  1412. }
  1413. }
  1414. replacements := params.getStringReplacements(addObjectData, false)
  1415. replacer := strings.NewReplacer(replacements...)
  1416. body := replaceWithReplacer(c.Body, replacer)
  1417. subject := replaceWithReplacer(c.Subject, replacer)
  1418. recipients := getEmailAddressesWithReplacer(c.Recipients, replacer)
  1419. bcc := getEmailAddressesWithReplacer(c.Bcc, replacer)
  1420. startTime := time.Now()
  1421. var files []*mail.File
  1422. fileAttachments := make([]string, 0, len(c.Attachments))
  1423. for _, attachment := range c.Attachments {
  1424. if attachment == dataprovider.RetentionReportPlaceHolder {
  1425. f, err := params.getRetentionReportsAsMailAttachment()
  1426. if err != nil {
  1427. return err
  1428. }
  1429. files = append(files, f)
  1430. continue
  1431. }
  1432. fileAttachments = append(fileAttachments, attachment)
  1433. }
  1434. if len(fileAttachments) > 0 {
  1435. user, err := params.getUserFromSender()
  1436. if err != nil {
  1437. return err
  1438. }
  1439. user, err = getUserForEventAction(user)
  1440. if err != nil {
  1441. return err
  1442. }
  1443. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1444. err = user.CheckFsRoot(connectionID)
  1445. defer user.CloseFs() //nolint:errcheck
  1446. if err != nil {
  1447. return fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  1448. }
  1449. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1450. res, err := getMailAttachments(conn, fileAttachments, replacer)
  1451. if err != nil {
  1452. return err
  1453. }
  1454. files = append(files, res...)
  1455. }
  1456. err := smtp.SendEmail(recipients, bcc, subject, body, smtp.EmailContentType(c.ContentType), files...)
  1457. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1458. time.Since(startTime), err)
  1459. if err != nil {
  1460. return fmt.Errorf("unable to send email: %w", err)
  1461. }
  1462. return nil
  1463. }
  1464. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1465. err := user.LoadAndApplyGroupSettings()
  1466. if err != nil {
  1467. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1468. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1469. }
  1470. user.UploadDataTransfer = 0
  1471. user.UploadBandwidth = 0
  1472. user.DownloadBandwidth = 0
  1473. user.Filters.DisableFsChecks = false
  1474. user.Filters.FilePatterns = nil
  1475. user.Filters.BandwidthLimits = nil
  1476. for k := range user.Permissions {
  1477. user.Permissions[k] = []string{dataprovider.PermAny}
  1478. }
  1479. return user, nil
  1480. }
  1481. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1482. results := make([]string, 0, len(paths))
  1483. for _, p := range paths {
  1484. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1485. }
  1486. return util.RemoveDuplicates(results, false)
  1487. }
  1488. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1489. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1490. if err != nil {
  1491. return err
  1492. }
  1493. return conn.RemoveFile(fs, fsPath, item, info)
  1494. }
  1495. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1496. user, err := getUserForEventAction(user)
  1497. if err != nil {
  1498. return err
  1499. }
  1500. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1501. err = user.CheckFsRoot(connectionID)
  1502. defer user.CloseFs() //nolint:errcheck
  1503. if err != nil {
  1504. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1505. }
  1506. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1507. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1508. info, err := conn.DoStat(item, 0, false)
  1509. if err != nil {
  1510. if conn.IsNotExistError(err) {
  1511. continue
  1512. }
  1513. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1514. }
  1515. if info.IsDir() {
  1516. if err = conn.RemoveDir(item); err != nil {
  1517. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1518. }
  1519. } else {
  1520. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1521. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1522. }
  1523. }
  1524. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1525. }
  1526. return nil
  1527. }
  1528. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1529. conditions dataprovider.ConditionOptions, params *EventParams,
  1530. ) error {
  1531. users, err := params.getUsers()
  1532. if err != nil {
  1533. return fmt.Errorf("unable to get users: %w", err)
  1534. }
  1535. var failures []string
  1536. executed := 0
  1537. for _, user := range users {
  1538. // if sender is set, the conditions have already been evaluated
  1539. if params.sender == "" {
  1540. if !checkUserConditionOptions(&user, &conditions) {
  1541. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1542. user.Username)
  1543. continue
  1544. }
  1545. }
  1546. executed++
  1547. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1548. params.AddError(err)
  1549. failures = append(failures, user.Username)
  1550. }
  1551. }
  1552. if len(failures) > 0 {
  1553. return fmt.Errorf("fs delete failed for users: %s", strings.Join(failures, ", "))
  1554. }
  1555. if executed == 0 {
  1556. eventManagerLog(logger.LevelError, "no delete executed")
  1557. return errors.New("no delete executed")
  1558. }
  1559. return nil
  1560. }
  1561. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1562. user, err := getUserForEventAction(user)
  1563. if err != nil {
  1564. return err
  1565. }
  1566. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1567. err = user.CheckFsRoot(connectionID)
  1568. defer user.CloseFs() //nolint:errcheck
  1569. if err != nil {
  1570. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1571. }
  1572. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1573. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1574. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1575. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1576. }
  1577. if err = conn.createDirIfMissing(item); err != nil {
  1578. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1579. }
  1580. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1581. }
  1582. return nil
  1583. }
  1584. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1585. conditions dataprovider.ConditionOptions, params *EventParams,
  1586. ) error {
  1587. users, err := params.getUsers()
  1588. if err != nil {
  1589. return fmt.Errorf("unable to get users: %w", err)
  1590. }
  1591. var failures []string
  1592. executed := 0
  1593. for _, user := range users {
  1594. // if sender is set, the conditions have already been evaluated
  1595. if params.sender == "" {
  1596. if !checkUserConditionOptions(&user, &conditions) {
  1597. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1598. user.Username)
  1599. continue
  1600. }
  1601. }
  1602. executed++
  1603. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1604. failures = append(failures, user.Username)
  1605. }
  1606. }
  1607. if len(failures) > 0 {
  1608. return fmt.Errorf("fs mkdir failed for users: %s", strings.Join(failures, ", "))
  1609. }
  1610. if executed == 0 {
  1611. eventManagerLog(logger.LevelError, "no mkdir executed")
  1612. return errors.New("no mkdir executed")
  1613. }
  1614. return nil
  1615. }
  1616. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1617. user dataprovider.User,
  1618. ) error {
  1619. user, err := getUserForEventAction(user)
  1620. if err != nil {
  1621. return err
  1622. }
  1623. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1624. err = user.CheckFsRoot(connectionID)
  1625. defer user.CloseFs() //nolint:errcheck
  1626. if err != nil {
  1627. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1628. }
  1629. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1630. for _, item := range renames {
  1631. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1632. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1633. if err = conn.renameInternal(source, target, true); err != nil {
  1634. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1635. }
  1636. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1637. }
  1638. return nil
  1639. }
  1640. func executeCopyFsActionForUser(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1641. user dataprovider.User,
  1642. ) error {
  1643. user, err := getUserForEventAction(user)
  1644. if err != nil {
  1645. return err
  1646. }
  1647. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1648. err = user.CheckFsRoot(connectionID)
  1649. defer user.CloseFs() //nolint:errcheck
  1650. if err != nil {
  1651. return fmt.Errorf("copy error, unable to check root fs for user %q: %w", user.Username, err)
  1652. }
  1653. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1654. for _, item := range copy {
  1655. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1656. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1657. if strings.HasSuffix(item.Key, "/") {
  1658. source += "/"
  1659. }
  1660. if strings.HasSuffix(item.Value, "/") {
  1661. target += "/"
  1662. }
  1663. if err = conn.Copy(source, target); err != nil {
  1664. return fmt.Errorf("unable to copy %q->%q, user %q: %w", source, target, user.Username, err)
  1665. }
  1666. eventManagerLog(logger.LevelDebug, "copy %q->%q ok, user %q", source, target, user.Username)
  1667. }
  1668. return nil
  1669. }
  1670. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1671. user dataprovider.User,
  1672. ) error {
  1673. user, err := getUserForEventAction(user)
  1674. if err != nil {
  1675. return err
  1676. }
  1677. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1678. err = user.CheckFsRoot(connectionID)
  1679. defer user.CloseFs() //nolint:errcheck
  1680. if err != nil {
  1681. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1682. }
  1683. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1684. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1685. if _, err = conn.DoStat(item, 0, false); err != nil {
  1686. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1687. }
  1688. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1689. }
  1690. return nil
  1691. }
  1692. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1693. conditions dataprovider.ConditionOptions, params *EventParams,
  1694. ) error {
  1695. users, err := params.getUsers()
  1696. if err != nil {
  1697. return fmt.Errorf("unable to get users: %w", err)
  1698. }
  1699. var failures []string
  1700. executed := 0
  1701. for _, user := range users {
  1702. // if sender is set, the conditions have already been evaluated
  1703. if params.sender == "" {
  1704. if !checkUserConditionOptions(&user, &conditions) {
  1705. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1706. user.Username)
  1707. continue
  1708. }
  1709. }
  1710. executed++
  1711. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1712. failures = append(failures, user.Username)
  1713. params.AddError(err)
  1714. }
  1715. }
  1716. if len(failures) > 0 {
  1717. return fmt.Errorf("fs rename failed for users: %s", strings.Join(failures, ", "))
  1718. }
  1719. if executed == 0 {
  1720. eventManagerLog(logger.LevelError, "no rename executed")
  1721. return errors.New("no rename executed")
  1722. }
  1723. return nil
  1724. }
  1725. func executeCopyFsRuleAction(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1726. conditions dataprovider.ConditionOptions, params *EventParams,
  1727. ) error {
  1728. users, err := params.getUsers()
  1729. if err != nil {
  1730. return fmt.Errorf("unable to get users: %w", err)
  1731. }
  1732. var failures []string
  1733. var executed int
  1734. for _, user := range users {
  1735. // if sender is set, the conditions have already been evaluated
  1736. if params.sender == "" {
  1737. if !checkUserConditionOptions(&user, &conditions) {
  1738. eventManagerLog(logger.LevelDebug, "skipping fs copy for user %s, condition options don't match",
  1739. user.Username)
  1740. continue
  1741. }
  1742. }
  1743. executed++
  1744. if err = executeCopyFsActionForUser(copy, replacer, user); err != nil {
  1745. failures = append(failures, user.Username)
  1746. params.AddError(err)
  1747. }
  1748. }
  1749. if len(failures) > 0 {
  1750. return fmt.Errorf("fs copy failed for users: %s", strings.Join(failures, ", "))
  1751. }
  1752. if executed == 0 {
  1753. eventManagerLog(logger.LevelError, "no copy executed")
  1754. return errors.New("no copy executed")
  1755. }
  1756. return nil
  1757. }
  1758. func getArchiveBaseDir(paths []string) string {
  1759. var parentDirs []string
  1760. for _, p := range paths {
  1761. parentDirs = append(parentDirs, path.Dir(p))
  1762. }
  1763. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1764. baseDir := "/"
  1765. if len(parentDirs) == 1 {
  1766. baseDir = parentDirs[0]
  1767. }
  1768. return baseDir
  1769. }
  1770. func getSizeForPath(conn *BaseConnection, p string, info os.FileInfo) (int64, error) {
  1771. if info.IsDir() {
  1772. var dirSize int64
  1773. lister, err := conn.ListDir(p)
  1774. if err != nil {
  1775. return 0, err
  1776. }
  1777. defer lister.Close()
  1778. for {
  1779. entries, err := lister.Next(vfs.ListerBatchSize)
  1780. finished := errors.Is(err, io.EOF)
  1781. if err != nil && !finished {
  1782. return 0, err
  1783. }
  1784. for _, entry := range entries {
  1785. size, err := getSizeForPath(conn, path.Join(p, entry.Name()), entry)
  1786. if err != nil {
  1787. return 0, err
  1788. }
  1789. dirSize += size
  1790. }
  1791. if finished {
  1792. return dirSize, nil
  1793. }
  1794. }
  1795. }
  1796. if info.Mode().IsRegular() {
  1797. return info.Size(), nil
  1798. }
  1799. return 0, nil
  1800. }
  1801. func estimateZipSize(conn *BaseConnection, zipPath string, paths []string) (int64, error) {
  1802. q, _ := conn.HasSpace(false, false, zipPath)
  1803. if q.HasSpace && q.GetRemainingSize() > 0 {
  1804. var size int64
  1805. for _, item := range paths {
  1806. info, err := conn.DoStat(item, 1, false)
  1807. if err != nil {
  1808. return size, err
  1809. }
  1810. itemSize, err := getSizeForPath(conn, item, info)
  1811. if err != nil {
  1812. return size, err
  1813. }
  1814. size += itemSize
  1815. }
  1816. eventManagerLog(logger.LevelDebug, "archive paths %v, archive name %q, size: %d", paths, zipPath, size)
  1817. // we assume the zip size will be half of the real size
  1818. return size / 2, nil
  1819. }
  1820. return -1, nil
  1821. }
  1822. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1823. user dataprovider.User,
  1824. ) error {
  1825. user, err := getUserForEventAction(user)
  1826. if err != nil {
  1827. return err
  1828. }
  1829. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1830. err = user.CheckFsRoot(connectionID)
  1831. defer user.CloseFs() //nolint:errcheck
  1832. if err != nil {
  1833. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1834. }
  1835. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1836. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1837. conn.CheckParentDirs(path.Dir(name)) //nolint:errcheck
  1838. paths := make([]string, 0, len(c.Paths))
  1839. for idx := range c.Paths {
  1840. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1841. if p == name {
  1842. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1843. }
  1844. paths = append(paths, p)
  1845. }
  1846. paths = util.RemoveDuplicates(paths, false)
  1847. estimatedSize, err := estimateZipSize(conn, name, paths)
  1848. if err != nil {
  1849. eventManagerLog(logger.LevelError, "unable to estimate size for archive %q: %v", name, err)
  1850. return fmt.Errorf("unable to estimate archive size: %w", err)
  1851. }
  1852. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name, estimatedSize)
  1853. if err != nil {
  1854. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1855. return fmt.Errorf("unable to create archive: %w", err)
  1856. }
  1857. defer cancelFn()
  1858. baseDir := getArchiveBaseDir(paths)
  1859. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1860. zipWriter := &zipWriterWrapper{
  1861. Name: name,
  1862. Writer: zip.NewWriter(writer),
  1863. Entries: make(map[string]bool),
  1864. }
  1865. startTime := time.Now()
  1866. for _, item := range paths {
  1867. if err := addZipEntry(zipWriter, conn, item, baseDir, 0); err != nil {
  1868. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1869. return err
  1870. }
  1871. }
  1872. if err := zipWriter.Writer.Close(); err != nil {
  1873. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1874. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1875. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1876. }
  1877. return closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime)
  1878. }
  1879. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1880. params *EventParams,
  1881. ) error {
  1882. users, err := params.getUsers()
  1883. if err != nil {
  1884. return fmt.Errorf("unable to get users: %w", err)
  1885. }
  1886. var failures []string
  1887. executed := 0
  1888. for _, user := range users {
  1889. // if sender is set, the conditions have already been evaluated
  1890. if params.sender == "" {
  1891. if !checkUserConditionOptions(&user, &conditions) {
  1892. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1893. user.Username)
  1894. continue
  1895. }
  1896. }
  1897. executed++
  1898. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1899. failures = append(failures, user.Username)
  1900. params.AddError(err)
  1901. }
  1902. }
  1903. if len(failures) > 0 {
  1904. return fmt.Errorf("fs existence check failed for users: %s", strings.Join(failures, ", "))
  1905. }
  1906. if executed == 0 {
  1907. eventManagerLog(logger.LevelError, "no existence check executed")
  1908. return errors.New("no existence check executed")
  1909. }
  1910. return nil
  1911. }
  1912. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1913. conditions dataprovider.ConditionOptions, params *EventParams,
  1914. ) error {
  1915. users, err := params.getUsers()
  1916. if err != nil {
  1917. return fmt.Errorf("unable to get users: %w", err)
  1918. }
  1919. var failures []string
  1920. executed := 0
  1921. for _, user := range users {
  1922. // if sender is set, the conditions have already been evaluated
  1923. if params.sender == "" {
  1924. if !checkUserConditionOptions(&user, &conditions) {
  1925. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1926. user.Username)
  1927. continue
  1928. }
  1929. }
  1930. executed++
  1931. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1932. failures = append(failures, user.Username)
  1933. params.AddError(err)
  1934. }
  1935. }
  1936. if len(failures) > 0 {
  1937. return fmt.Errorf("fs compress failed for users: %s", strings.Join(failures, ","))
  1938. }
  1939. if executed == 0 {
  1940. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1941. return errors.New("no file/folder compressed")
  1942. }
  1943. return nil
  1944. }
  1945. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1946. params *EventParams,
  1947. ) error {
  1948. addObjectData := false
  1949. replacements := params.getStringReplacements(addObjectData, false)
  1950. replacer := strings.NewReplacer(replacements...)
  1951. switch c.Type {
  1952. case dataprovider.FilesystemActionRename:
  1953. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1954. case dataprovider.FilesystemActionDelete:
  1955. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1956. case dataprovider.FilesystemActionMkdirs:
  1957. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1958. case dataprovider.FilesystemActionExist:
  1959. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1960. case dataprovider.FilesystemActionCompress:
  1961. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1962. case dataprovider.FilesystemActionCopy:
  1963. return executeCopyFsRuleAction(c.Copy, replacer, conditions, params)
  1964. default:
  1965. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1966. }
  1967. }
  1968. func executeQuotaResetForUser(user *dataprovider.User) error {
  1969. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1970. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1971. user.Username, err)
  1972. return err
  1973. }
  1974. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  1975. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1976. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1977. }
  1978. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1979. numFiles, size, err := user.ScanQuota()
  1980. if err != nil {
  1981. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1982. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1983. }
  1984. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  1985. if err != nil {
  1986. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1987. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1988. }
  1989. return nil
  1990. }
  1991. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1992. users, err := params.getUsers()
  1993. if err != nil {
  1994. return fmt.Errorf("unable to get users: %w", err)
  1995. }
  1996. var failures []string
  1997. executed := 0
  1998. for _, user := range users {
  1999. // if sender is set, the conditions have already been evaluated
  2000. if params.sender == "" {
  2001. if !checkUserConditionOptions(&user, &conditions) {
  2002. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  2003. user.Username)
  2004. continue
  2005. }
  2006. }
  2007. executed++
  2008. if err = executeQuotaResetForUser(&user); err != nil {
  2009. params.AddError(err)
  2010. failures = append(failures, user.Username)
  2011. }
  2012. }
  2013. if len(failures) > 0 {
  2014. return fmt.Errorf("quota reset failed for users: %s", strings.Join(failures, ", "))
  2015. }
  2016. if executed == 0 {
  2017. eventManagerLog(logger.LevelError, "no user quota reset executed")
  2018. return errors.New("no user quota reset executed")
  2019. }
  2020. return nil
  2021. }
  2022. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2023. folders, err := params.getFolders()
  2024. if err != nil {
  2025. return fmt.Errorf("unable to get folders: %w", err)
  2026. }
  2027. var failures []string
  2028. executed := 0
  2029. for _, folder := range folders {
  2030. // if sender is set, the conditions have already been evaluated
  2031. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  2032. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  2033. folder.Name)
  2034. continue
  2035. }
  2036. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  2037. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  2038. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  2039. failures = append(failures, folder.Name)
  2040. continue
  2041. }
  2042. executed++
  2043. f := vfs.VirtualFolder{
  2044. BaseVirtualFolder: folder,
  2045. VirtualPath: "/",
  2046. }
  2047. numFiles, size, err := f.ScanQuota()
  2048. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  2049. if err != nil {
  2050. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  2051. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  2052. failures = append(failures, folder.Name)
  2053. continue
  2054. }
  2055. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  2056. if err != nil {
  2057. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  2058. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  2059. failures = append(failures, folder.Name)
  2060. }
  2061. }
  2062. if len(failures) > 0 {
  2063. return fmt.Errorf("quota reset failed for folders: %s", strings.Join(failures, ", "))
  2064. }
  2065. if executed == 0 {
  2066. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  2067. return errors.New("no folder quota reset executed")
  2068. }
  2069. return nil
  2070. }
  2071. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2072. users, err := params.getUsers()
  2073. if err != nil {
  2074. return fmt.Errorf("unable to get users: %w", err)
  2075. }
  2076. var failures []string
  2077. executed := 0
  2078. for _, user := range users {
  2079. // if sender is set, the conditions have already been evaluated
  2080. if params.sender == "" {
  2081. if !checkUserConditionOptions(&user, &conditions) {
  2082. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  2083. user.Username)
  2084. continue
  2085. }
  2086. }
  2087. executed++
  2088. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  2089. if err != nil {
  2090. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  2091. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  2092. failures = append(failures, user.Username)
  2093. }
  2094. }
  2095. if len(failures) > 0 {
  2096. return fmt.Errorf("transfer quota reset failed for users: %s", strings.Join(failures, ", "))
  2097. }
  2098. if executed == 0 {
  2099. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  2100. return errors.New("no transfer quota reset executed")
  2101. }
  2102. return nil
  2103. }
  2104. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  2105. params *EventParams, actionName string,
  2106. ) error {
  2107. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2108. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  2109. user.Username, err)
  2110. return err
  2111. }
  2112. check := RetentionCheck{
  2113. Folders: folders,
  2114. }
  2115. c := RetentionChecks.Add(check, &user)
  2116. if c == nil {
  2117. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  2118. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  2119. }
  2120. defer func() {
  2121. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  2122. Username: user.Username,
  2123. ActionName: actionName,
  2124. Results: c.results,
  2125. })
  2126. }()
  2127. if err := c.Start(); err != nil {
  2128. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  2129. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  2130. }
  2131. return nil
  2132. }
  2133. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  2134. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  2135. ) error {
  2136. users, err := params.getUsers()
  2137. if err != nil {
  2138. return fmt.Errorf("unable to get users: %w", err)
  2139. }
  2140. var failures []string
  2141. executed := 0
  2142. for _, user := range users {
  2143. // if sender is set, the conditions have already been evaluated
  2144. if params.sender == "" {
  2145. if !checkUserConditionOptions(&user, &conditions) {
  2146. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  2147. user.Username)
  2148. continue
  2149. }
  2150. }
  2151. executed++
  2152. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  2153. failures = append(failures, user.Username)
  2154. params.AddError(err)
  2155. }
  2156. }
  2157. if len(failures) > 0 {
  2158. return fmt.Errorf("retention check failed for users: %s", strings.Join(failures, ", "))
  2159. }
  2160. if executed == 0 {
  2161. eventManagerLog(logger.LevelError, "no retention check executed")
  2162. return errors.New("no retention check executed")
  2163. }
  2164. return nil
  2165. }
  2166. func executeUserExpirationCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2167. users, err := params.getUsers()
  2168. if err != nil {
  2169. return fmt.Errorf("unable to get users: %w", err)
  2170. }
  2171. var failures []string
  2172. var executed int
  2173. for _, user := range users {
  2174. // if sender is set, the conditions have already been evaluated
  2175. if params.sender == "" {
  2176. if !checkUserConditionOptions(&user, &conditions) {
  2177. eventManagerLog(logger.LevelDebug, "skipping expiration check for user %q, condition options don't match",
  2178. user.Username)
  2179. continue
  2180. }
  2181. }
  2182. executed++
  2183. if user.ExpirationDate > 0 {
  2184. expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate)
  2185. if expDate.Before(time.Now()) {
  2186. failures = append(failures, user.Username)
  2187. }
  2188. }
  2189. }
  2190. if len(failures) > 0 {
  2191. return fmt.Errorf("expired users: %s", strings.Join(failures, ", "))
  2192. }
  2193. if executed == 0 {
  2194. eventManagerLog(logger.LevelError, "no user expiration check executed")
  2195. return errors.New("no user expiration check executed")
  2196. }
  2197. return nil
  2198. }
  2199. func executeInactivityCheckForUser(user *dataprovider.User, config dataprovider.EventActionUserInactivity, when time.Time) error {
  2200. if config.DeleteThreshold > 0 && (user.Status == 0 || config.DisableThreshold == 0) {
  2201. if inactivityDays := user.InactivityDays(when); inactivityDays > config.DeleteThreshold {
  2202. err := dataprovider.DeleteUser(user.Username, dataprovider.ActionExecutorSystem, "", "")
  2203. eventManagerLog(logger.LevelInfo, "deleting inactive user %q, days of inactivity: %d/%d, err: %v",
  2204. user.Username, inactivityDays, config.DeleteThreshold, err)
  2205. if err != nil {
  2206. return fmt.Errorf("unable to delete inactive user %q", user.Username)
  2207. }
  2208. return fmt.Errorf("inactive user %q deleted. Number of days of inactivity: %d", user.Username, inactivityDays)
  2209. }
  2210. }
  2211. if config.DisableThreshold > 0 && user.Status > 0 {
  2212. if inactivityDays := user.InactivityDays(when); inactivityDays > config.DisableThreshold {
  2213. user.Status = 0
  2214. err := dataprovider.UpdateUser(user, dataprovider.ActionExecutorSystem, "", "")
  2215. eventManagerLog(logger.LevelInfo, "disabling inactive user %q, days of inactivity: %d/%d, err: %v",
  2216. user.Username, inactivityDays, config.DisableThreshold, err)
  2217. if err != nil {
  2218. return fmt.Errorf("unable to disable inactive user %q", user.Username)
  2219. }
  2220. return fmt.Errorf("inactive user %q disabled. Number of days of inactivity: %d", user.Username, inactivityDays)
  2221. }
  2222. }
  2223. return nil
  2224. }
  2225. func executeUserInactivityCheckRuleAction(config dataprovider.EventActionUserInactivity,
  2226. conditions dataprovider.ConditionOptions,
  2227. params *EventParams,
  2228. when time.Time,
  2229. ) error {
  2230. users, err := params.getUsers()
  2231. if err != nil {
  2232. return fmt.Errorf("unable to get users: %w", err)
  2233. }
  2234. var failures []string
  2235. for _, user := range users {
  2236. // if sender is set, the conditions have already been evaluated
  2237. if params.sender == "" {
  2238. if !checkUserConditionOptions(&user, &conditions) {
  2239. eventManagerLog(logger.LevelDebug, "skipping inactivity check for user %q, condition options don't match",
  2240. user.Username)
  2241. continue
  2242. }
  2243. }
  2244. if err = executeInactivityCheckForUser(&user, config, when); err != nil {
  2245. params.AddError(err)
  2246. failures = append(failures, user.Username)
  2247. }
  2248. }
  2249. if len(failures) > 0 {
  2250. return fmt.Errorf("executed inactivity check actions for users: %s", strings.Join(failures, ", "))
  2251. }
  2252. return nil
  2253. }
  2254. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  2255. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2256. eventManagerLog(logger.LevelError, "skipping password expiration check for user %q, cannot apply group settings: %v",
  2257. user.Username, err)
  2258. return err
  2259. }
  2260. if user.ExpirationDate > 0 {
  2261. if expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate); expDate.Before(time.Now()) {
  2262. eventManagerLog(logger.LevelDebug, "skipping password expiration check for expired user %q, expiration date: %s",
  2263. user.Username, expDate)
  2264. return nil
  2265. }
  2266. }
  2267. if user.Filters.PasswordExpiration == 0 {
  2268. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  2269. return nil
  2270. }
  2271. days := user.PasswordExpiresIn()
  2272. if days > config.Threshold {
  2273. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  2274. user.Username, days, config.Threshold)
  2275. return nil
  2276. }
  2277. body := new(bytes.Buffer)
  2278. data := make(map[string]any)
  2279. data["Username"] = user.Username
  2280. data["Days"] = days
  2281. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  2282. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  2283. user.Username, err)
  2284. return err
  2285. }
  2286. subject := "SFTPGo password expiration notification"
  2287. startTime := time.Now()
  2288. if err := smtp.SendEmail([]string{user.Email}, nil, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  2289. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  2290. user.Username, err, time.Since(startTime))
  2291. return err
  2292. }
  2293. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  2294. user.Username, days, time.Since(startTime))
  2295. return nil
  2296. }
  2297. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  2298. params *EventParams) error {
  2299. users, err := params.getUsers()
  2300. if err != nil {
  2301. return fmt.Errorf("unable to get users: %w", err)
  2302. }
  2303. var failures []string
  2304. for _, user := range users {
  2305. // if sender is set, the conditions have already been evaluated
  2306. if params.sender == "" {
  2307. if !checkUserConditionOptions(&user, &conditions) {
  2308. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  2309. user.Username)
  2310. continue
  2311. }
  2312. }
  2313. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  2314. params.AddError(err)
  2315. failures = append(failures, user.Username)
  2316. }
  2317. }
  2318. if len(failures) > 0 {
  2319. return fmt.Errorf("password expiration check failed for users: %s", strings.Join(failures, ", "))
  2320. }
  2321. return nil
  2322. }
  2323. func executeAdminCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.Admin, error) {
  2324. admin, err := dataprovider.AdminExists(params.Name)
  2325. exists := err == nil
  2326. if exists && c.Mode == 1 {
  2327. return &admin, nil
  2328. }
  2329. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2330. return nil, err
  2331. }
  2332. replacements := params.getStringReplacements(false, true)
  2333. replacer := strings.NewReplacer(replacements...)
  2334. data := replaceWithReplacer(c.TemplateAdmin, replacer)
  2335. var newAdmin dataprovider.Admin
  2336. err = json.Unmarshal(util.StringToBytes(data), &newAdmin)
  2337. if err != nil {
  2338. return nil, err
  2339. }
  2340. if newAdmin.Password == "" {
  2341. newAdmin.Password = util.GenerateUniqueID()
  2342. }
  2343. if exists {
  2344. eventManagerLog(logger.LevelDebug, "updating admin %q after IDP login", params.Name)
  2345. err = dataprovider.UpdateAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2346. } else {
  2347. eventManagerLog(logger.LevelDebug, "creating admin %q after IDP login", params.Name)
  2348. err = dataprovider.AddAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2349. }
  2350. return &newAdmin, err
  2351. }
  2352. func executeUserCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.User, error) {
  2353. user, err := dataprovider.UserExists(params.Name, "")
  2354. exists := err == nil
  2355. if exists && c.Mode == 1 {
  2356. err = user.LoadAndApplyGroupSettings()
  2357. return &user, err
  2358. }
  2359. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2360. return nil, err
  2361. }
  2362. replacements := params.getStringReplacements(false, true)
  2363. replacer := strings.NewReplacer(replacements...)
  2364. data := replaceWithReplacer(c.TemplateUser, replacer)
  2365. var newUser dataprovider.User
  2366. err = json.Unmarshal(util.StringToBytes(data), &newUser)
  2367. if err != nil {
  2368. return nil, err
  2369. }
  2370. if exists {
  2371. eventManagerLog(logger.LevelDebug, "updating user %q after IDP login", params.Name)
  2372. err = dataprovider.UpdateUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2373. } else {
  2374. eventManagerLog(logger.LevelDebug, "creating user %q after IDP login", params.Name)
  2375. err = dataprovider.AddUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2376. }
  2377. if err != nil {
  2378. return nil, err
  2379. }
  2380. u, err := dataprovider.GetUserWithGroupSettings(params.Name, "")
  2381. return &u, err
  2382. }
  2383. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams, //nolint:gocyclo
  2384. conditions dataprovider.ConditionOptions,
  2385. ) error {
  2386. var err error
  2387. switch action.Type {
  2388. case dataprovider.ActionTypeHTTP:
  2389. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  2390. case dataprovider.ActionTypeCommand:
  2391. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  2392. case dataprovider.ActionTypeEmail:
  2393. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  2394. case dataprovider.ActionTypeBackup:
  2395. var backupPath string
  2396. backupPath, err = dataprovider.ExecuteBackup()
  2397. if err == nil {
  2398. params.setBackupParams(backupPath)
  2399. }
  2400. case dataprovider.ActionTypeUserQuotaReset:
  2401. err = executeUsersQuotaResetRuleAction(conditions, params)
  2402. case dataprovider.ActionTypeFolderQuotaReset:
  2403. err = executeFoldersQuotaResetRuleAction(conditions, params)
  2404. case dataprovider.ActionTypeTransferQuotaReset:
  2405. err = executeTransferQuotaResetRuleAction(conditions, params)
  2406. case dataprovider.ActionTypeDataRetentionCheck:
  2407. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  2408. case dataprovider.ActionTypeFilesystem:
  2409. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  2410. case dataprovider.ActionTypePasswordExpirationCheck:
  2411. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  2412. case dataprovider.ActionTypeUserExpirationCheck:
  2413. err = executeUserExpirationCheckRuleAction(conditions, params)
  2414. case dataprovider.ActionTypeUserInactivityCheck:
  2415. err = executeUserInactivityCheckRuleAction(action.Options.UserInactivityConfig, conditions, params, time.Now())
  2416. case dataprovider.ActionTypeRotateLogs:
  2417. err = logger.RotateLogFile()
  2418. default:
  2419. err = fmt.Errorf("unsupported action type: %d", action.Type)
  2420. }
  2421. if err != nil {
  2422. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  2423. }
  2424. params.AddError(err)
  2425. return err
  2426. }
  2427. func executeIDPAccountCheckRule(rule dataprovider.EventRule, params EventParams) (*dataprovider.User,
  2428. *dataprovider.Admin, error,
  2429. ) {
  2430. for _, action := range rule.Actions {
  2431. if action.Type == dataprovider.ActionTypeIDPAccountCheck {
  2432. startTime := time.Now()
  2433. var user *dataprovider.User
  2434. var admin *dataprovider.Admin
  2435. var err error
  2436. var failedActions []string
  2437. paramsCopy := params.getACopy()
  2438. switch params.Event {
  2439. case IDPLoginAdmin:
  2440. admin, err = executeAdminCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2441. case IDPLoginUser:
  2442. user, err = executeUserCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2443. default:
  2444. err = fmt.Errorf("unsupported IDP login event: %q", params.Event)
  2445. }
  2446. if err != nil {
  2447. paramsCopy.AddError(fmt.Errorf("unable to handle %q: %w", params.Event, err))
  2448. eventManagerLog(logger.LevelError, "unable to handle IDP login event %q, err: %v", params.Event, err)
  2449. failedActions = append(failedActions, action.Name)
  2450. } else {
  2451. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2452. action.Name, rule.Name, time.Since(startTime))
  2453. }
  2454. // execute async actions if any, including failure actions
  2455. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2456. return user, admin, err
  2457. }
  2458. }
  2459. eventManagerLog(logger.LevelError, "no action executed for IDP login event %q, event rule: %q", params.Event, rule.Name)
  2460. return nil, nil, errors.New("no action executed")
  2461. }
  2462. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  2463. var errRes error
  2464. for _, rule := range rules {
  2465. var failedActions []string
  2466. paramsCopy := params.getACopy()
  2467. for _, action := range rule.Actions {
  2468. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  2469. startTime := time.Now()
  2470. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  2471. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  2472. action.Name, rule.Name, time.Since(startTime), err)
  2473. failedActions = append(failedActions, action.Name)
  2474. // we return the last error, it is ok for now
  2475. errRes = err
  2476. if action.Options.StopOnFailure {
  2477. break
  2478. }
  2479. } else {
  2480. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  2481. action.Name, rule.Name, time.Since(startTime))
  2482. }
  2483. }
  2484. }
  2485. // execute async actions if any, including failure actions
  2486. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2487. }
  2488. return errRes
  2489. }
  2490. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  2491. eventManager.addAsyncTask()
  2492. defer eventManager.removeAsyncTask()
  2493. params.addUID()
  2494. for _, rule := range rules {
  2495. executeRuleAsyncActions(rule, params.getACopy(), nil)
  2496. }
  2497. }
  2498. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  2499. for _, action := range rule.Actions {
  2500. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  2501. startTime := time.Now()
  2502. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2503. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  2504. action.Name, rule.Name, time.Since(startTime), err)
  2505. failedActions = append(failedActions, action.Name)
  2506. if action.Options.StopOnFailure {
  2507. break
  2508. }
  2509. } else {
  2510. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2511. action.Name, rule.Name, time.Since(startTime))
  2512. }
  2513. }
  2514. }
  2515. if len(failedActions) > 0 {
  2516. params.updateStatusFromError = false
  2517. // execute failure actions
  2518. for _, action := range rule.Actions {
  2519. if action.Options.IsFailureAction {
  2520. startTime := time.Now()
  2521. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2522. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  2523. action.Name, rule.Name, time.Since(startTime), err)
  2524. if action.Options.StopOnFailure {
  2525. break
  2526. }
  2527. } else {
  2528. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  2529. action.Name, rule.Name, time.Since(startTime))
  2530. }
  2531. }
  2532. }
  2533. }
  2534. }
  2535. type eventCronJob struct {
  2536. ruleName string
  2537. }
  2538. func (j *eventCronJob) getTask(rule *dataprovider.EventRule) (dataprovider.Task, error) {
  2539. if rule.GuardFromConcurrentExecution() {
  2540. task, err := dataprovider.GetTaskByName(rule.Name)
  2541. if err != nil {
  2542. if errors.Is(err, util.ErrNotFound) {
  2543. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  2544. task = dataprovider.Task{
  2545. Name: rule.Name,
  2546. UpdateAt: 0,
  2547. Version: 0,
  2548. }
  2549. err = dataprovider.AddTask(rule.Name)
  2550. if err != nil {
  2551. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  2552. return task, err
  2553. }
  2554. } else {
  2555. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  2556. }
  2557. }
  2558. return task, err
  2559. }
  2560. return dataprovider.Task{}, nil
  2561. }
  2562. func (j *eventCronJob) Run() {
  2563. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  2564. rule, err := dataprovider.EventRuleExists(j.ruleName)
  2565. if err != nil {
  2566. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  2567. return
  2568. }
  2569. if err := rule.CheckActionsConsistency(""); err != nil {
  2570. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  2571. return
  2572. }
  2573. task, err := j.getTask(&rule)
  2574. if err != nil {
  2575. return
  2576. }
  2577. if task.Name != "" {
  2578. updateInterval := 5 * time.Minute
  2579. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  2580. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  2581. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  2582. return
  2583. }
  2584. err = dataprovider.UpdateTask(rule.Name, task.Version)
  2585. if err != nil {
  2586. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  2587. rule.Name, err)
  2588. return
  2589. }
  2590. ticker := time.NewTicker(updateInterval)
  2591. done := make(chan bool)
  2592. defer func() {
  2593. done <- true
  2594. ticker.Stop()
  2595. }()
  2596. go func(taskName string) {
  2597. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2598. for {
  2599. select {
  2600. case <-done:
  2601. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2602. return
  2603. case <-ticker.C:
  2604. err := dataprovider.UpdateTaskTimestamp(taskName)
  2605. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2606. }
  2607. }
  2608. }(task.Name)
  2609. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2610. } else {
  2611. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2612. }
  2613. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2614. }
  2615. // RunOnDemandRule executes actions for a rule with on-demand trigger
  2616. func RunOnDemandRule(name string) error {
  2617. eventManagerLog(logger.LevelDebug, "executing on demand rule %q", name)
  2618. rule, err := dataprovider.EventRuleExists(name)
  2619. if err != nil {
  2620. eventManagerLog(logger.LevelDebug, "unable to load rule with name %q", name)
  2621. return util.NewRecordNotFoundError(fmt.Sprintf("rule %q does not exist", name))
  2622. }
  2623. if rule.Trigger != dataprovider.EventTriggerOnDemand {
  2624. eventManagerLog(logger.LevelDebug, "cannot run rule %q as on demand, trigger: %d", name, rule.Trigger)
  2625. return util.NewValidationError(fmt.Sprintf("rule %q is not defined as on-demand", name))
  2626. }
  2627. if rule.Status != 1 {
  2628. eventManagerLog(logger.LevelDebug, "on-demand rule %q is inactive", name)
  2629. return util.NewValidationError(fmt.Sprintf("rule %q is inactive", name))
  2630. }
  2631. if err := rule.CheckActionsConsistency(""); err != nil {
  2632. eventManagerLog(logger.LevelError, "on-demand rule %q has incompatible actions: %v", name, err)
  2633. return util.NewValidationError(fmt.Sprintf("rule %q has incosistent actions", name))
  2634. }
  2635. eventManagerLog(logger.LevelDebug, "on-demand rule %q started", name)
  2636. go executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2637. return nil
  2638. }
  2639. type zipWriterWrapper struct {
  2640. Name string
  2641. Entries map[string]bool
  2642. Writer *zip.Writer
  2643. }
  2644. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2645. logger.Log(level, "eventmanager", "", format, v...)
  2646. }