eventmanager.go 94 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "mime/multipart"
  25. "net/http"
  26. "net/textproto"
  27. "net/url"
  28. "os"
  29. "os/exec"
  30. "path"
  31. "path/filepath"
  32. "slices"
  33. "strconv"
  34. "strings"
  35. "sync"
  36. "sync/atomic"
  37. "time"
  38. "github.com/bmatcuk/doublestar/v4"
  39. "github.com/klauspost/compress/zip"
  40. "github.com/robfig/cron/v3"
  41. "github.com/rs/xid"
  42. "github.com/sftpgo/sdk"
  43. "github.com/wneessen/go-mail"
  44. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  45. "github.com/drakkan/sftpgo/v2/internal/logger"
  46. "github.com/drakkan/sftpgo/v2/internal/plugin"
  47. "github.com/drakkan/sftpgo/v2/internal/smtp"
  48. "github.com/drakkan/sftpgo/v2/internal/util"
  49. "github.com/drakkan/sftpgo/v2/internal/vfs"
  50. )
  51. const (
  52. ipBlockedEventName = "IP Blocked"
  53. maxAttachmentsSize = int64(10 * 1024 * 1024)
  54. objDataPlaceholder = "{{ObjectData}}"
  55. objDataPlaceholderString = "{{ObjectDataString}}"
  56. dateTimeMillisFormat = "2006-01-02T15:04:05.000"
  57. )
  58. // Supported IDP login events
  59. const (
  60. IDPLoginUser = "IDP login user"
  61. IDPLoginAdmin = "IDP login admin"
  62. )
  63. var (
  64. // eventManager handle the supported event rules actions
  65. eventManager eventRulesContainer
  66. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  67. )
  68. func init() {
  69. eventManager = eventRulesContainer{
  70. schedulesMapping: make(map[string][]cron.EntryID),
  71. // arbitrary maximum number of concurrent asynchronous tasks,
  72. // each task could execute multiple actions
  73. concurrencyGuard: make(chan struct{}, 200),
  74. }
  75. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  76. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  77. p := EventParams{
  78. Name: executor,
  79. ObjectName: objectName,
  80. Event: operation,
  81. Status: 1,
  82. ObjectType: objectType,
  83. IP: ip,
  84. Role: role,
  85. Timestamp: time.Now(),
  86. Object: object,
  87. }
  88. if u, ok := object.(*dataprovider.User); ok {
  89. p.Email = u.Email
  90. p.Groups = u.Groups
  91. } else if a, ok := object.(*dataprovider.Admin); ok {
  92. p.Email = a.Email
  93. }
  94. eventManager.handleProviderEvent(p)
  95. })
  96. }
  97. // HandleCertificateEvent checks and executes action rules for certificate events
  98. func HandleCertificateEvent(params EventParams) {
  99. eventManager.handleCertificateEvent(params)
  100. }
  101. // HandleIDPLoginEvent executes actions defined for a successful login from an Identity Provider
  102. func HandleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User, *dataprovider.Admin, error) {
  103. return eventManager.handleIDPLoginEvent(params, customFields)
  104. }
  105. // eventRulesContainer stores event rules by trigger
  106. type eventRulesContainer struct {
  107. sync.RWMutex
  108. lastLoad atomic.Int64
  109. FsEvents []dataprovider.EventRule
  110. ProviderEvents []dataprovider.EventRule
  111. Schedules []dataprovider.EventRule
  112. IPBlockedEvents []dataprovider.EventRule
  113. CertificateEvents []dataprovider.EventRule
  114. IPDLoginEvents []dataprovider.EventRule
  115. schedulesMapping map[string][]cron.EntryID
  116. concurrencyGuard chan struct{}
  117. }
  118. func (r *eventRulesContainer) addAsyncTask() {
  119. activeHooks.Add(1)
  120. r.concurrencyGuard <- struct{}{}
  121. }
  122. func (r *eventRulesContainer) removeAsyncTask() {
  123. activeHooks.Add(-1)
  124. <-r.concurrencyGuard
  125. }
  126. func (r *eventRulesContainer) getLastLoadTime() int64 {
  127. return r.lastLoad.Load()
  128. }
  129. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  130. r.lastLoad.Store(modTime)
  131. }
  132. // RemoveRule deletes the rule with the specified name
  133. func (r *eventRulesContainer) RemoveRule(name string) {
  134. r.Lock()
  135. defer r.Unlock()
  136. r.removeRuleInternal(name)
  137. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  138. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  139. }
  140. func (r *eventRulesContainer) removeRuleInternal(name string) {
  141. for idx := range r.FsEvents {
  142. if r.FsEvents[idx].Name == name {
  143. lastIdx := len(r.FsEvents) - 1
  144. r.FsEvents[idx] = r.FsEvents[lastIdx]
  145. r.FsEvents = r.FsEvents[:lastIdx]
  146. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  147. return
  148. }
  149. }
  150. for idx := range r.ProviderEvents {
  151. if r.ProviderEvents[idx].Name == name {
  152. lastIdx := len(r.ProviderEvents) - 1
  153. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  154. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  155. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  156. return
  157. }
  158. }
  159. for idx := range r.IPBlockedEvents {
  160. if r.IPBlockedEvents[idx].Name == name {
  161. lastIdx := len(r.IPBlockedEvents) - 1
  162. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  163. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  164. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  165. return
  166. }
  167. }
  168. for idx := range r.CertificateEvents {
  169. if r.CertificateEvents[idx].Name == name {
  170. lastIdx := len(r.CertificateEvents) - 1
  171. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  172. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  173. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  174. return
  175. }
  176. }
  177. for idx := range r.IPDLoginEvents {
  178. if r.IPDLoginEvents[idx].Name == name {
  179. lastIdx := len(r.IPDLoginEvents) - 1
  180. r.IPDLoginEvents[idx] = r.IPDLoginEvents[lastIdx]
  181. r.IPDLoginEvents = r.IPDLoginEvents[:lastIdx]
  182. eventManagerLog(logger.LevelDebug, "removed rule %q from IDP login events", name)
  183. return
  184. }
  185. }
  186. for idx := range r.Schedules {
  187. if r.Schedules[idx].Name == name {
  188. if schedules, ok := r.schedulesMapping[name]; ok {
  189. for _, entryID := range schedules {
  190. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  191. eventScheduler.Remove(entryID)
  192. }
  193. delete(r.schedulesMapping, name)
  194. }
  195. lastIdx := len(r.Schedules) - 1
  196. r.Schedules[idx] = r.Schedules[lastIdx]
  197. r.Schedules = r.Schedules[:lastIdx]
  198. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  199. return
  200. }
  201. }
  202. }
  203. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  204. r.removeRuleInternal(rule.Name)
  205. if rule.DeletedAt > 0 {
  206. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  207. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  208. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  209. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  210. }
  211. return
  212. }
  213. if rule.Status != 1 || rule.Trigger == dataprovider.EventTriggerOnDemand {
  214. return
  215. }
  216. switch rule.Trigger {
  217. case dataprovider.EventTriggerFsEvent:
  218. r.FsEvents = append(r.FsEvents, rule)
  219. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  220. case dataprovider.EventTriggerProviderEvent:
  221. r.ProviderEvents = append(r.ProviderEvents, rule)
  222. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  223. case dataprovider.EventTriggerIPBlocked:
  224. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  225. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  226. case dataprovider.EventTriggerCertificate:
  227. r.CertificateEvents = append(r.CertificateEvents, rule)
  228. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  229. case dataprovider.EventTriggerIDPLogin:
  230. r.IPDLoginEvents = append(r.IPDLoginEvents, rule)
  231. eventManagerLog(logger.LevelDebug, "added rule %q to IDP login events", rule.Name)
  232. case dataprovider.EventTriggerSchedule:
  233. for _, schedule := range rule.Conditions.Schedules {
  234. cronSpec := schedule.GetCronSpec()
  235. job := &eventCronJob{
  236. ruleName: dataprovider.ConvertName(rule.Name),
  237. }
  238. entryID, err := eventScheduler.AddJob(cronSpec, job)
  239. if err != nil {
  240. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  241. return
  242. }
  243. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  244. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  245. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  246. }
  247. r.Schedules = append(r.Schedules, rule)
  248. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  249. default:
  250. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  251. }
  252. }
  253. func (r *eventRulesContainer) loadRules() {
  254. eventManagerLog(logger.LevelDebug, "loading updated rules")
  255. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  256. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  257. if err != nil {
  258. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  259. return
  260. }
  261. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  262. if len(rules) > 0 {
  263. r.Lock()
  264. defer r.Unlock()
  265. for _, rule := range rules {
  266. r.addUpdateRuleInternal(rule)
  267. }
  268. }
  269. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d, IDP login events: %d",
  270. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents), len(r.IPDLoginEvents))
  271. r.setLastLoadTime(modTime)
  272. }
  273. func (*eventRulesContainer) checkIPDLoginEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  274. switch conditions.IDPLoginEvent {
  275. case dataprovider.IDPLoginUser:
  276. if params.Event != IDPLoginUser {
  277. return false
  278. }
  279. case dataprovider.IDPLoginAdmin:
  280. if params.Event != IDPLoginAdmin {
  281. return false
  282. }
  283. }
  284. return checkEventConditionPatterns(params.Name, conditions.Options.Names)
  285. }
  286. func (*eventRulesContainer) checkProviderEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  287. if !util.Contains(conditions.ProviderEvents, params.Event) {
  288. return false
  289. }
  290. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  291. return false
  292. }
  293. if !checkEventGroupConditionPatterns(params.Groups, conditions.Options.GroupNames) {
  294. return false
  295. }
  296. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  297. return false
  298. }
  299. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  300. return false
  301. }
  302. return true
  303. }
  304. func (*eventRulesContainer) checkFsEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  305. if !util.Contains(conditions.FsEvents, params.Event) {
  306. return false
  307. }
  308. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  309. return false
  310. }
  311. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  312. return false
  313. }
  314. if !checkEventGroupConditionPatterns(params.Groups, conditions.Options.GroupNames) {
  315. return false
  316. }
  317. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  318. return false
  319. }
  320. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  321. return false
  322. }
  323. if params.Event == operationUpload || params.Event == operationDownload {
  324. if conditions.Options.MinFileSize > 0 {
  325. if params.FileSize < conditions.Options.MinFileSize {
  326. return false
  327. }
  328. }
  329. if conditions.Options.MaxFileSize > 0 {
  330. if params.FileSize > conditions.Options.MaxFileSize {
  331. return false
  332. }
  333. }
  334. }
  335. return true
  336. }
  337. // hasFsRules returns true if there are any rules for filesystem event triggers
  338. func (r *eventRulesContainer) hasFsRules() bool {
  339. r.RLock()
  340. defer r.RUnlock()
  341. return len(r.FsEvents) > 0
  342. }
  343. // handleFsEvent executes the rules actions defined for the specified event.
  344. // The boolean parameter indicates whether a sync action was executed
  345. func (r *eventRulesContainer) handleFsEvent(params EventParams) (bool, error) {
  346. if params.Protocol == protocolEventAction {
  347. return false, nil
  348. }
  349. r.RLock()
  350. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  351. for _, rule := range r.FsEvents {
  352. if r.checkFsEventMatch(&rule.Conditions, &params) {
  353. if err := rule.CheckActionsConsistency(""); err != nil {
  354. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  355. rule.Name, err, params.Event)
  356. continue
  357. }
  358. hasSyncActions := false
  359. for _, action := range rule.Actions {
  360. if action.Options.ExecuteSync {
  361. hasSyncActions = true
  362. break
  363. }
  364. }
  365. if hasSyncActions {
  366. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  367. } else {
  368. rulesAsync = append(rulesAsync, rule)
  369. }
  370. }
  371. }
  372. r.RUnlock()
  373. params.sender = params.Name
  374. params.addUID()
  375. if len(rulesAsync) > 0 {
  376. go executeAsyncRulesActions(rulesAsync, params)
  377. }
  378. if len(rulesWithSyncActions) > 0 {
  379. return true, executeSyncRulesActions(rulesWithSyncActions, params)
  380. }
  381. return false, nil
  382. }
  383. func (r *eventRulesContainer) handleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User,
  384. *dataprovider.Admin, error,
  385. ) {
  386. r.RLock()
  387. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  388. for _, rule := range r.IPDLoginEvents {
  389. if r.checkIPDLoginEventMatch(&rule.Conditions, &params) {
  390. if err := rule.CheckActionsConsistency(""); err != nil {
  391. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  392. rule.Name, err, params.Event)
  393. continue
  394. }
  395. hasSyncActions := false
  396. for _, action := range rule.Actions {
  397. if action.Options.ExecuteSync {
  398. hasSyncActions = true
  399. break
  400. }
  401. }
  402. if hasSyncActions {
  403. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  404. } else {
  405. rulesAsync = append(rulesAsync, rule)
  406. }
  407. }
  408. }
  409. r.RUnlock()
  410. if len(rulesAsync) == 0 && len(rulesWithSyncActions) == 0 {
  411. return nil, nil, nil
  412. }
  413. params.addIDPCustomFields(customFields)
  414. if len(rulesWithSyncActions) > 1 {
  415. var ruleNames []string
  416. for _, r := range rulesWithSyncActions {
  417. ruleNames = append(ruleNames, r.Name)
  418. }
  419. return nil, nil, fmt.Errorf("more than one account check action rules matches: %q", strings.Join(ruleNames, ","))
  420. }
  421. params.addUID()
  422. if len(rulesAsync) > 0 {
  423. go executeAsyncRulesActions(rulesAsync, params)
  424. }
  425. if len(rulesWithSyncActions) > 0 {
  426. return executeIDPAccountCheckRule(rulesWithSyncActions[0], params)
  427. }
  428. return nil, nil, nil
  429. }
  430. // username is populated for user objects
  431. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  432. r.RLock()
  433. defer r.RUnlock()
  434. var rules []dataprovider.EventRule
  435. for _, rule := range r.ProviderEvents {
  436. if r.checkProviderEventMatch(&rule.Conditions, &params) {
  437. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  438. rules = append(rules, rule)
  439. } else {
  440. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  441. rule.Name, err, params.Event, params.ObjectType)
  442. }
  443. }
  444. }
  445. if len(rules) > 0 {
  446. params.sender = params.ObjectName
  447. go executeAsyncRulesActions(rules, params)
  448. }
  449. }
  450. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  451. r.RLock()
  452. defer r.RUnlock()
  453. if len(r.IPBlockedEvents) == 0 {
  454. return
  455. }
  456. var rules []dataprovider.EventRule
  457. for _, rule := range r.IPBlockedEvents {
  458. if err := rule.CheckActionsConsistency(""); err == nil {
  459. rules = append(rules, rule)
  460. } else {
  461. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  462. rule.Name, err, params.Event)
  463. }
  464. }
  465. if len(rules) > 0 {
  466. go executeAsyncRulesActions(rules, params)
  467. }
  468. }
  469. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  470. r.RLock()
  471. defer r.RUnlock()
  472. if len(r.CertificateEvents) == 0 {
  473. return
  474. }
  475. var rules []dataprovider.EventRule
  476. for _, rule := range r.CertificateEvents {
  477. if err := rule.CheckActionsConsistency(""); err == nil {
  478. rules = append(rules, rule)
  479. } else {
  480. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  481. rule.Name, err, params.Event)
  482. }
  483. }
  484. if len(rules) > 0 {
  485. go executeAsyncRulesActions(rules, params)
  486. }
  487. }
  488. type executedRetentionCheck struct {
  489. Username string
  490. ActionName string
  491. Results []folderRetentionCheckResult
  492. }
  493. // EventParams defines the supported event parameters
  494. type EventParams struct {
  495. Name string
  496. Groups []sdk.GroupMapping
  497. Event string
  498. Status int
  499. VirtualPath string
  500. FsPath string
  501. VirtualTargetPath string
  502. FsTargetPath string
  503. ObjectName string
  504. Extension string
  505. ObjectType string
  506. FileSize int64
  507. Elapsed int64
  508. Protocol string
  509. IP string
  510. Role string
  511. Email string
  512. Timestamp time.Time
  513. UID string
  514. IDPCustomFields *map[string]string
  515. Object plugin.Renderer
  516. Metadata map[string]string
  517. sender string
  518. updateStatusFromError bool
  519. errors []string
  520. retentionChecks []executedRetentionCheck
  521. }
  522. func (p *EventParams) getACopy() *EventParams {
  523. params := *p
  524. params.errors = make([]string, len(p.errors))
  525. copy(params.errors, p.errors)
  526. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  527. for _, c := range p.retentionChecks {
  528. executedCheck := executedRetentionCheck{
  529. Username: c.Username,
  530. ActionName: c.ActionName,
  531. }
  532. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  533. copy(executedCheck.Results, c.Results)
  534. retentionChecks = append(retentionChecks, executedCheck)
  535. }
  536. params.retentionChecks = retentionChecks
  537. if p.IDPCustomFields != nil {
  538. fields := make(map[string]string)
  539. for k, v := range *p.IDPCustomFields {
  540. fields[k] = v
  541. }
  542. params.IDPCustomFields = &fields
  543. }
  544. if len(params.Metadata) > 0 {
  545. metadata := make(map[string]string)
  546. for k, v := range p.Metadata {
  547. metadata[k] = v
  548. }
  549. params.Metadata = metadata
  550. }
  551. return &params
  552. }
  553. func (p *EventParams) addIDPCustomFields(customFields *map[string]any) {
  554. if customFields == nil || len(*customFields) == 0 {
  555. return
  556. }
  557. fields := make(map[string]string)
  558. for k, v := range *customFields {
  559. switch val := v.(type) {
  560. case string:
  561. fields[k] = val
  562. }
  563. }
  564. p.IDPCustomFields = &fields
  565. }
  566. // AddError adds a new error to the event params and update the status if needed
  567. func (p *EventParams) AddError(err error) {
  568. if err == nil {
  569. return
  570. }
  571. if p.updateStatusFromError && p.Status == 1 {
  572. p.Status = 2
  573. }
  574. p.errors = append(p.errors, err.Error())
  575. }
  576. func (p *EventParams) addUID() {
  577. if p.UID == "" {
  578. p.UID = util.GenerateUniqueID()
  579. }
  580. }
  581. func (p *EventParams) setBackupParams(backupPath string) {
  582. if p.sender != "" {
  583. return
  584. }
  585. p.sender = dataprovider.ActionExecutorSystem
  586. p.FsPath = backupPath
  587. p.ObjectName = filepath.Base(backupPath)
  588. p.VirtualPath = "/" + p.ObjectName
  589. p.Timestamp = time.Now()
  590. info, err := os.Stat(backupPath)
  591. if err == nil {
  592. p.FileSize = info.Size()
  593. }
  594. }
  595. func (p *EventParams) getStatusString() string {
  596. switch p.Status {
  597. case 1:
  598. return "OK"
  599. default:
  600. return "KO"
  601. }
  602. }
  603. // getUsers returns users with group settings not applied
  604. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  605. if p.sender == "" {
  606. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeUsers})
  607. if err != nil {
  608. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  609. return nil, errors.New("unable to get users")
  610. }
  611. return dump.Users, nil
  612. }
  613. user, err := p.getUserFromSender()
  614. if err != nil {
  615. return nil, err
  616. }
  617. return []dataprovider.User{user}, nil
  618. }
  619. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  620. if p.sender == dataprovider.ActionExecutorSystem {
  621. return dataprovider.User{
  622. BaseUser: sdk.BaseUser{
  623. Status: 1,
  624. Username: p.sender,
  625. HomeDir: dataprovider.GetBackupsPath(),
  626. Permissions: map[string][]string{
  627. "/": {dataprovider.PermAny},
  628. },
  629. },
  630. }, nil
  631. }
  632. user, err := dataprovider.UserExists(p.sender, "")
  633. if err != nil {
  634. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  635. return user, fmt.Errorf("error getting user %q", p.sender)
  636. }
  637. return user, nil
  638. }
  639. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  640. if p.sender == "" {
  641. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeFolders})
  642. return dump.Folders, err
  643. }
  644. folder, err := dataprovider.GetFolderByName(p.sender)
  645. if err != nil {
  646. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  647. }
  648. return []vfs.BaseVirtualFolder{folder}, nil
  649. }
  650. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  651. if len(p.retentionChecks) == 0 {
  652. return nil, errors.New("no data retention report available")
  653. }
  654. var b bytes.Buffer
  655. if _, err := p.writeCompressedDataRetentionReports(&b); err != nil {
  656. return nil, err
  657. }
  658. return b.Bytes(), nil
  659. }
  660. func (p *EventParams) writeCompressedDataRetentionReports(w io.Writer) (int64, error) {
  661. var n int64
  662. wr := zip.NewWriter(w)
  663. for _, check := range p.retentionChecks {
  664. data, err := getCSVRetentionReport(check.Results)
  665. if err != nil {
  666. return n, fmt.Errorf("unable to get CSV report: %w", err)
  667. }
  668. dataSize := int64(len(data))
  669. n += dataSize
  670. // we suppose a 3:1 compression ratio
  671. if n > (maxAttachmentsSize * 3) {
  672. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s",
  673. util.ByteCountIEC(n))
  674. return n, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(n))
  675. }
  676. fh := &zip.FileHeader{
  677. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  678. Method: zip.Deflate,
  679. Modified: time.Now().UTC(),
  680. }
  681. f, err := wr.CreateHeader(fh)
  682. if err != nil {
  683. return n, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  684. }
  685. _, err = io.CopyN(f, bytes.NewBuffer(data), dataSize)
  686. if err != nil {
  687. return n, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  688. }
  689. }
  690. if err := wr.Close(); err != nil {
  691. return n, fmt.Errorf("unable to close zip writer: %w", err)
  692. }
  693. return n, nil
  694. }
  695. func (p *EventParams) getRetentionReportsAsMailAttachment() (*mail.File, error) {
  696. if len(p.retentionChecks) == 0 {
  697. return nil, errors.New("no data retention report available")
  698. }
  699. return &mail.File{
  700. Name: "retention-reports.zip",
  701. Header: make(map[string][]string),
  702. Writer: p.writeCompressedDataRetentionReports,
  703. }, nil
  704. }
  705. func (*EventParams) getStringReplacement(val string, jsonEscaped bool) string {
  706. if jsonEscaped {
  707. return util.JSONEscape(val)
  708. }
  709. return val
  710. }
  711. func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []string {
  712. replacements := []string{
  713. "{{Name}}", p.getStringReplacement(p.Name, jsonEscaped),
  714. "{{Event}}", p.Event,
  715. "{{Status}}", fmt.Sprintf("%d", p.Status),
  716. "{{VirtualPath}}", p.getStringReplacement(p.VirtualPath, jsonEscaped),
  717. "{{EscapedVirtualPath}}", p.getStringReplacement(url.QueryEscape(p.VirtualPath), jsonEscaped),
  718. "{{FsPath}}", p.getStringReplacement(p.FsPath, jsonEscaped),
  719. "{{VirtualTargetPath}}", p.getStringReplacement(p.VirtualTargetPath, jsonEscaped),
  720. "{{FsTargetPath}}", p.getStringReplacement(p.FsTargetPath, jsonEscaped),
  721. "{{ObjectName}}", p.getStringReplacement(p.ObjectName, jsonEscaped),
  722. "{{ObjectType}}", p.ObjectType,
  723. "{{FileSize}}", strconv.FormatInt(p.FileSize, 10),
  724. "{{Elapsed}}", strconv.FormatInt(p.Elapsed, 10),
  725. "{{Protocol}}", p.Protocol,
  726. "{{IP}}", p.IP,
  727. "{{Role}}", p.getStringReplacement(p.Role, jsonEscaped),
  728. "{{Email}}", p.getStringReplacement(p.Email, jsonEscaped),
  729. "{{Timestamp}}", strconv.FormatInt(p.Timestamp.UnixNano(), 10),
  730. "{{DateTime}}", p.Timestamp.UTC().Format(dateTimeMillisFormat),
  731. "{{StatusString}}", p.getStatusString(),
  732. "{{UID}}", p.getStringReplacement(p.UID, jsonEscaped),
  733. "{{Ext}}", p.getStringReplacement(p.Extension, jsonEscaped),
  734. }
  735. if p.VirtualPath != "" {
  736. replacements = append(replacements, "{{VirtualDirPath}}", p.getStringReplacement(path.Dir(p.VirtualPath), jsonEscaped))
  737. }
  738. if p.VirtualTargetPath != "" {
  739. replacements = append(replacements, "{{VirtualTargetDirPath}}", p.getStringReplacement(path.Dir(p.VirtualTargetPath), jsonEscaped))
  740. replacements = append(replacements, "{{TargetName}}", p.getStringReplacement(path.Base(p.VirtualTargetPath), jsonEscaped))
  741. }
  742. if len(p.errors) > 0 {
  743. replacements = append(replacements, "{{ErrorString}}", p.getStringReplacement(strings.Join(p.errors, ", "), jsonEscaped))
  744. } else {
  745. replacements = append(replacements, "{{ErrorString}}", "")
  746. }
  747. replacements = append(replacements, objDataPlaceholder, "{}")
  748. replacements = append(replacements, objDataPlaceholderString, "")
  749. if addObjectData {
  750. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  751. if err == nil {
  752. dataString := util.BytesToString(data)
  753. replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
  754. replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
  755. }
  756. }
  757. if p.IDPCustomFields != nil {
  758. for k, v := range *p.IDPCustomFields {
  759. replacements = append(replacements, fmt.Sprintf("{{IDPField%s}}", k), p.getStringReplacement(v, jsonEscaped))
  760. }
  761. }
  762. replacements = append(replacements, "{{Metadata}}", "{}")
  763. replacements = append(replacements, "{{MetadataString}}", "")
  764. if len(p.Metadata) > 0 {
  765. data, err := json.Marshal(p.Metadata)
  766. if err == nil {
  767. dataString := util.BytesToString(data)
  768. replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
  769. replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
  770. }
  771. }
  772. return replacements
  773. }
  774. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  775. var b bytes.Buffer
  776. csvWriter := csv.NewWriter(&b)
  777. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  778. "elapsed (ms)", "info", "error"})
  779. if err != nil {
  780. return nil, err
  781. }
  782. for _, result := range results {
  783. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  784. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  785. result.Info, result.Error})
  786. if err != nil {
  787. return nil, err
  788. }
  789. }
  790. csvWriter.Flush()
  791. err = csvWriter.Error()
  792. return b.Bytes(), err
  793. }
  794. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSourcePath, virtualTargetPath string,
  795. numFiles int, truncatedSize int64, errTransfer error, operation string, startTime time.Time,
  796. ) error {
  797. var fsDstPath string
  798. var errDstFs error
  799. errWrite := w.Close()
  800. targetPath := virtualSourcePath
  801. if virtualTargetPath != "" {
  802. targetPath = virtualTargetPath
  803. var fsDst vfs.Fs
  804. fsDst, fsDstPath, errDstFs = conn.GetFsAndResolvedPath(virtualTargetPath)
  805. if errTransfer != nil && errDstFs == nil {
  806. // try to remove a partial file on error. If this fails, we can't do anything
  807. errRemove := fsDst.Remove(fsDstPath, false)
  808. conn.Log(logger.LevelDebug, "removing partial file %q after write error, result: %v", virtualTargetPath, errRemove)
  809. }
  810. }
  811. info, err := conn.doStatInternal(targetPath, 0, false, false)
  812. if err == nil {
  813. updateUserQuotaAfterFileWrite(conn, targetPath, numFiles, info.Size()-truncatedSize)
  814. var fsSrcPath string
  815. var errSrcFs error
  816. if virtualSourcePath != "" {
  817. _, fsSrcPath, errSrcFs = conn.GetFsAndResolvedPath(virtualSourcePath)
  818. }
  819. if errSrcFs == nil && errDstFs == nil {
  820. elapsed := time.Since(startTime).Nanoseconds() / 1000000
  821. if errTransfer == nil {
  822. errTransfer = errWrite
  823. }
  824. if operation == operationCopy {
  825. logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
  826. "", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed)
  827. }
  828. ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed, nil) //nolint:errcheck
  829. }
  830. } else {
  831. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
  832. }
  833. if errTransfer != nil {
  834. return errTransfer
  835. }
  836. return errWrite
  837. }
  838. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  839. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  840. if err != nil {
  841. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  842. return
  843. }
  844. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  845. if vfolder.IsIncludedInUserQuota() {
  846. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  847. }
  848. }
  849. func checkWriterPermsAndQuota(conn *BaseConnection, virtualPath string, numFiles int, expectedSize, truncatedSize int64) error {
  850. if numFiles == 0 {
  851. if !conn.User.HasPerm(dataprovider.PermOverwrite, path.Dir(virtualPath)) {
  852. return conn.GetPermissionDeniedError()
  853. }
  854. } else {
  855. if !conn.User.HasPerm(dataprovider.PermUpload, path.Dir(virtualPath)) {
  856. return conn.GetPermissionDeniedError()
  857. }
  858. }
  859. q, _ := conn.HasSpace(numFiles > 0, false, virtualPath)
  860. if !q.HasSpace {
  861. return conn.GetQuotaExceededError()
  862. }
  863. if expectedSize != -1 {
  864. sizeDiff := expectedSize - truncatedSize
  865. if sizeDiff > 0 {
  866. remainingSize := q.GetRemainingSize()
  867. if remainingSize > 0 && remainingSize < sizeDiff {
  868. return conn.GetQuotaExceededError()
  869. }
  870. }
  871. }
  872. return nil
  873. }
  874. func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64) (io.WriteCloser, int, int64, func(), error) {
  875. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  876. if err != nil {
  877. return nil, 0, 0, nil, err
  878. }
  879. var truncatedSize, fileSize int64
  880. numFiles := 1
  881. isFileOverwrite := false
  882. info, err := fs.Lstat(fsPath)
  883. if err == nil {
  884. fileSize = info.Size()
  885. if info.IsDir() {
  886. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  887. }
  888. if info.Mode().IsRegular() {
  889. isFileOverwrite = true
  890. truncatedSize = fileSize
  891. }
  892. numFiles = 0
  893. }
  894. if err != nil && !fs.IsNotExist(err) {
  895. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  896. }
  897. if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
  898. return nil, numFiles, truncatedSize, nil, err
  899. }
  900. f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1, false))
  901. if err != nil {
  902. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  903. }
  904. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  905. if isFileOverwrite {
  906. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  907. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  908. truncatedSize = 0
  909. }
  910. }
  911. if cancelFn == nil {
  912. cancelFn = func() {}
  913. }
  914. if f != nil {
  915. return f, numFiles, truncatedSize, cancelFn, nil
  916. }
  917. return w, numFiles, truncatedSize, cancelFn, nil
  918. }
  919. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string, recursion int) error {
  920. if entryPath == wr.Name {
  921. // skip the archive itself
  922. return nil
  923. }
  924. if recursion >= util.MaxRecursion {
  925. eventManagerLog(logger.LevelError, "unable to add zip entry %q, recursion too deep: %v", entryPath, recursion)
  926. return util.ErrRecursionTooDeep
  927. }
  928. recursion++
  929. info, err := conn.DoStat(entryPath, 1, false)
  930. if err != nil {
  931. eventManagerLog(logger.LevelError, "unable to add zip entry %q, stat error: %v", entryPath, err)
  932. return err
  933. }
  934. entryName, err := getZipEntryName(entryPath, baseDir)
  935. if err != nil {
  936. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  937. return err
  938. }
  939. if _, ok := wr.Entries[entryName]; ok {
  940. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  941. return nil
  942. }
  943. wr.Entries[entryName] = true
  944. if info.IsDir() {
  945. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  946. Name: entryName + "/",
  947. Method: zip.Deflate,
  948. Modified: info.ModTime(),
  949. })
  950. if err != nil {
  951. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  952. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  953. }
  954. lister, err := conn.ListDir(entryPath)
  955. if err != nil {
  956. eventManagerLog(logger.LevelError, "unable to add zip entry %q, get dir lister error: %v", entryPath, err)
  957. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  958. }
  959. defer lister.Close()
  960. for {
  961. contents, err := lister.Next(vfs.ListerBatchSize)
  962. finished := errors.Is(err, io.EOF)
  963. if err := lister.convertError(err); err != nil {
  964. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  965. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  966. }
  967. for _, info := range contents {
  968. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  969. if err := addZipEntry(wr, conn, fullPath, baseDir, recursion); err != nil {
  970. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  971. return err
  972. }
  973. }
  974. if finished {
  975. return nil
  976. }
  977. }
  978. }
  979. if !info.Mode().IsRegular() {
  980. // we only allow regular files
  981. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  982. return nil
  983. }
  984. return addFileToZip(wr, conn, entryPath, entryName, info.ModTime())
  985. }
  986. func addFileToZip(wr *zipWriterWrapper, conn *BaseConnection, entryPath, entryName string, modTime time.Time) error {
  987. reader, cancelFn, err := getFileReader(conn, entryPath)
  988. if err != nil {
  989. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  990. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  991. }
  992. defer cancelFn()
  993. defer reader.Close()
  994. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  995. Name: entryName,
  996. Method: zip.Deflate,
  997. Modified: modTime,
  998. })
  999. if err != nil {
  1000. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  1001. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  1002. }
  1003. _, err = io.Copy(f, reader)
  1004. return err
  1005. }
  1006. func getZipEntryName(entryPath, baseDir string) (string, error) {
  1007. if !strings.HasPrefix(entryPath, baseDir) {
  1008. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  1009. }
  1010. entryPath = strings.TrimPrefix(entryPath, baseDir)
  1011. return strings.TrimPrefix(entryPath, "/"), nil
  1012. }
  1013. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  1014. if !conn.User.HasPerm(dataprovider.PermDownload, path.Dir(virtualPath)) {
  1015. return nil, nil, conn.GetPermissionDeniedError()
  1016. }
  1017. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  1018. if err != nil {
  1019. return nil, nil, err
  1020. }
  1021. f, r, cancelFn, err := fs.Open(fsPath, 0)
  1022. if err != nil {
  1023. return nil, nil, conn.GetFsError(fs, err)
  1024. }
  1025. if cancelFn == nil {
  1026. cancelFn = func() {}
  1027. }
  1028. if f != nil {
  1029. return f, cancelFn, nil
  1030. }
  1031. return r, cancelFn, nil
  1032. }
  1033. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  1034. reader, cancelFn, err := getFileReader(conn, virtualPath)
  1035. if err != nil {
  1036. return err
  1037. }
  1038. defer cancelFn()
  1039. defer reader.Close()
  1040. _, err = io.Copy(w, reader)
  1041. return err
  1042. }
  1043. func getFileContentFn(conn *BaseConnection, virtualPath string, size int64) func(w io.Writer) (int64, error) {
  1044. return func(w io.Writer) (int64, error) {
  1045. reader, cancelFn, err := getFileReader(conn, virtualPath)
  1046. if err != nil {
  1047. return 0, err
  1048. }
  1049. defer cancelFn()
  1050. defer reader.Close()
  1051. return io.CopyN(w, reader, size)
  1052. }
  1053. }
  1054. func getMailAttachments(conn *BaseConnection, attachments []string, replacer *strings.Replacer) ([]*mail.File, error) {
  1055. var files []*mail.File
  1056. totalSize := int64(0)
  1057. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  1058. info, err := conn.DoStat(virtualPath, 0, false)
  1059. if err != nil {
  1060. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  1061. }
  1062. if !info.Mode().IsRegular() {
  1063. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  1064. }
  1065. totalSize += info.Size()
  1066. if totalSize > maxAttachmentsSize {
  1067. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  1068. }
  1069. files = append(files, &mail.File{
  1070. Name: path.Base(virtualPath),
  1071. Header: make(map[string][]string),
  1072. Writer: getFileContentFn(conn, virtualPath, info.Size()),
  1073. })
  1074. }
  1075. return files, nil
  1076. }
  1077. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  1078. if !strings.Contains(input, "{{") {
  1079. return input
  1080. }
  1081. return replacer.Replace(input)
  1082. }
  1083. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  1084. var matched bool
  1085. var err error
  1086. if strings.Contains(p.Pattern, "**") {
  1087. matched, err = doublestar.Match(p.Pattern, name)
  1088. } else {
  1089. matched, err = path.Match(p.Pattern, name)
  1090. }
  1091. if err != nil {
  1092. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  1093. return false
  1094. }
  1095. if p.InverseMatch {
  1096. return !matched
  1097. }
  1098. return matched
  1099. }
  1100. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  1101. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1102. return false
  1103. }
  1104. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  1105. return false
  1106. }
  1107. if !checkEventGroupConditionPatterns(user.Groups, conditions.GroupNames) {
  1108. return false
  1109. }
  1110. return true
  1111. }
  1112. // checkEventConditionPatterns returns false if patterns are defined and no match is found
  1113. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  1114. if len(patterns) == 0 {
  1115. return true
  1116. }
  1117. matches := false
  1118. for _, p := range patterns {
  1119. // assume, that multiple InverseMatches are set
  1120. if p.InverseMatch {
  1121. if checkEventConditionPattern(p, name) {
  1122. matches = true
  1123. } else {
  1124. return false
  1125. }
  1126. } else if checkEventConditionPattern(p, name) {
  1127. return true
  1128. }
  1129. }
  1130. return matches
  1131. }
  1132. func checkEventGroupConditionPatterns(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  1133. if len(patterns) == 0 {
  1134. return true
  1135. }
  1136. matches := false
  1137. for _, group := range groups {
  1138. for _, p := range patterns {
  1139. // assume, that multiple InverseMatches are set
  1140. if p.InverseMatch {
  1141. if checkEventConditionPattern(p, group.Name) {
  1142. matches = true
  1143. } else {
  1144. return false
  1145. }
  1146. } else {
  1147. if checkEventConditionPattern(p, group.Name) {
  1148. return true
  1149. }
  1150. }
  1151. }
  1152. }
  1153. return matches
  1154. }
  1155. func getHTTPRuleActionEndpoint(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  1156. u, err := url.Parse(c.Endpoint)
  1157. if err != nil {
  1158. return "", fmt.Errorf("invalid endpoint: %w", err)
  1159. }
  1160. if strings.Contains(u.Path, "{{") {
  1161. pathComponents := strings.Split(u.Path, "/")
  1162. for idx := range pathComponents {
  1163. part := replaceWithReplacer(pathComponents[idx], replacer)
  1164. if part != pathComponents[idx] {
  1165. pathComponents[idx] = url.PathEscape(part)
  1166. }
  1167. }
  1168. u.Path = ""
  1169. u = u.JoinPath(pathComponents...)
  1170. }
  1171. if len(c.QueryParameters) > 0 {
  1172. q := u.Query()
  1173. for _, keyVal := range c.QueryParameters {
  1174. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1175. }
  1176. u.RawQuery = q.Encode()
  1177. }
  1178. return u.String(), nil
  1179. }
  1180. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  1181. conn *BaseConnection, replacer *strings.Replacer, params *EventParams, addObjectData bool,
  1182. ) error {
  1183. partWriter, err := m.CreatePart(h)
  1184. if err != nil {
  1185. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  1186. return err
  1187. }
  1188. if part.Body != "" {
  1189. cType := h.Get("Content-Type")
  1190. if strings.Contains(strings.ToLower(cType), "application/json") {
  1191. replacements := params.getStringReplacements(addObjectData, true)
  1192. jsonReplacer := strings.NewReplacer(replacements...)
  1193. _, err = partWriter.Write(util.StringToBytes(replaceWithReplacer(part.Body, jsonReplacer)))
  1194. } else {
  1195. _, err = partWriter.Write(util.StringToBytes(replaceWithReplacer(part.Body, replacer)))
  1196. }
  1197. if err != nil {
  1198. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1199. return err
  1200. }
  1201. return nil
  1202. }
  1203. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  1204. data, err := params.getCompressedDataRetentionReport()
  1205. if err != nil {
  1206. return err
  1207. }
  1208. _, err = partWriter.Write(data)
  1209. if err != nil {
  1210. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1211. return err
  1212. }
  1213. return nil
  1214. }
  1215. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  1216. if err != nil {
  1217. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  1218. return err
  1219. }
  1220. return nil
  1221. }
  1222. func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1223. cancel context.CancelFunc, user dataprovider.User, params *EventParams, addObjectData bool,
  1224. ) (io.Reader, string, error) {
  1225. var body io.Reader
  1226. if c.Method == http.MethodGet {
  1227. return body, "", nil
  1228. }
  1229. if c.Body != "" {
  1230. if c.Body == dataprovider.RetentionReportPlaceHolder {
  1231. data, err := params.getCompressedDataRetentionReport()
  1232. if err != nil {
  1233. return body, "", err
  1234. }
  1235. return bytes.NewBuffer(data), "", nil
  1236. }
  1237. if c.HasJSONBody() {
  1238. replacements := params.getStringReplacements(addObjectData, true)
  1239. jsonReplacer := strings.NewReplacer(replacements...)
  1240. return bytes.NewBufferString(replaceWithReplacer(c.Body, jsonReplacer)), "", nil
  1241. }
  1242. return bytes.NewBufferString(replaceWithReplacer(c.Body, replacer)), "", nil
  1243. }
  1244. if len(c.Parts) > 0 {
  1245. r, w := io.Pipe()
  1246. m := multipart.NewWriter(w)
  1247. var conn *BaseConnection
  1248. if user.Username != "" {
  1249. var err error
  1250. user, err = getUserForEventAction(user)
  1251. if err != nil {
  1252. return body, "", err
  1253. }
  1254. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1255. err = user.CheckFsRoot(connectionID)
  1256. if err != nil {
  1257. user.CloseFs() //nolint:errcheck
  1258. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  1259. user.Username, err)
  1260. }
  1261. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1262. }
  1263. go func() {
  1264. defer w.Close()
  1265. defer user.CloseFs() //nolint:errcheck
  1266. for _, part := range c.Parts {
  1267. h := make(textproto.MIMEHeader)
  1268. if part.Body != "" {
  1269. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  1270. } else {
  1271. h.Set("Content-Disposition",
  1272. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  1273. multipartQuoteEscaper.Replace(part.Name),
  1274. multipartQuoteEscaper.Replace((path.Base(replaceWithReplacer(part.Filepath, replacer))))))
  1275. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  1276. if contentType == "" {
  1277. contentType = "application/octet-stream"
  1278. }
  1279. h.Set("Content-Type", contentType)
  1280. }
  1281. for _, keyVal := range part.Headers {
  1282. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1283. }
  1284. if err := writeHTTPPart(m, part, h, conn, replacer, params, addObjectData); err != nil {
  1285. cancel()
  1286. return
  1287. }
  1288. }
  1289. m.Close()
  1290. }()
  1291. return r, m.FormDataContentType(), nil
  1292. }
  1293. return body, "", nil
  1294. }
  1295. func setHTTPReqHeaders(req *http.Request, c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1296. contentType string,
  1297. ) {
  1298. if contentType != "" {
  1299. req.Header.Set("Content-Type", contentType)
  1300. }
  1301. if c.Username != "" || c.Password.GetPayload() != "" {
  1302. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1303. }
  1304. for _, keyVal := range c.Headers {
  1305. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1306. }
  1307. }
  1308. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1309. if err := c.TryDecryptPassword(); err != nil {
  1310. return err
  1311. }
  1312. addObjectData := false
  1313. if params.Object != nil {
  1314. addObjectData = c.HasObjectData()
  1315. }
  1316. replacements := params.getStringReplacements(addObjectData, false)
  1317. replacer := strings.NewReplacer(replacements...)
  1318. endpoint, err := getHTTPRuleActionEndpoint(&c, replacer)
  1319. if err != nil {
  1320. return err
  1321. }
  1322. ctx, cancel := c.GetContext()
  1323. defer cancel()
  1324. var user dataprovider.User
  1325. if c.HasMultipartFiles() {
  1326. user, err = params.getUserFromSender()
  1327. if err != nil {
  1328. return err
  1329. }
  1330. }
  1331. body, contentType, err := getHTTPRuleActionBody(&c, replacer, cancel, user, params, addObjectData)
  1332. if err != nil {
  1333. return err
  1334. }
  1335. if body != nil {
  1336. rc, ok := body.(io.ReadCloser)
  1337. if ok {
  1338. defer rc.Close()
  1339. }
  1340. }
  1341. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1342. if err != nil {
  1343. return err
  1344. }
  1345. setHTTPReqHeaders(req, &c, replacer, contentType)
  1346. client := c.GetHTTPClient()
  1347. defer client.CloseIdleConnections()
  1348. startTime := time.Now()
  1349. resp, err := client.Do(req)
  1350. if err != nil {
  1351. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1352. endpoint, time.Since(startTime), err)
  1353. return fmt.Errorf("error sending HTTP request: %w", err)
  1354. }
  1355. defer resp.Body.Close()
  1356. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1357. endpoint, time.Since(startTime), resp.StatusCode)
  1358. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1359. if rb, err := io.ReadAll(io.LimitReader(resp.Body, 2048)); err == nil {
  1360. eventManagerLog(logger.LevelDebug, "error notification response from endpoint %q: %s",
  1361. endpoint, util.BytesToString(rb))
  1362. }
  1363. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1364. }
  1365. return nil
  1366. }
  1367. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1368. if !dataprovider.IsActionCommandAllowed(c.Cmd) {
  1369. return fmt.Errorf("command %q is not allowed", c.Cmd)
  1370. }
  1371. addObjectData := false
  1372. if params.Object != nil {
  1373. for _, k := range c.EnvVars {
  1374. if strings.Contains(k.Value, objDataPlaceholder) || strings.Contains(k.Value, objDataPlaceholderString) {
  1375. addObjectData = true
  1376. break
  1377. }
  1378. }
  1379. }
  1380. replacements := params.getStringReplacements(addObjectData, false)
  1381. replacer := strings.NewReplacer(replacements...)
  1382. args := make([]string, 0, len(c.Args))
  1383. for _, arg := range c.Args {
  1384. args = append(args, replaceWithReplacer(arg, replacer))
  1385. }
  1386. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1387. defer cancel()
  1388. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1389. cmd.Env = []string{}
  1390. for _, keyVal := range c.EnvVars {
  1391. if keyVal.Value == "$" {
  1392. val := os.Getenv(keyVal.Key)
  1393. if val == "" {
  1394. eventManagerLog(logger.LevelDebug, "empty value for environment variable %q", keyVal.Key)
  1395. }
  1396. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, val))
  1397. } else {
  1398. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1399. }
  1400. }
  1401. startTime := time.Now()
  1402. err := cmd.Run()
  1403. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1404. c.Cmd, time.Since(startTime), err)
  1405. return err
  1406. }
  1407. func getEmailAddressesWithReplacer(addrs []string, replacer *strings.Replacer) []string {
  1408. if len(addrs) == 0 {
  1409. return nil
  1410. }
  1411. recipients := make([]string, 0, len(addrs))
  1412. for _, recipient := range addrs {
  1413. rcpt := replaceWithReplacer(recipient, replacer)
  1414. if rcpt != "" {
  1415. recipients = append(recipients, rcpt)
  1416. }
  1417. }
  1418. return recipients
  1419. }
  1420. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1421. addObjectData := false
  1422. if params.Object != nil {
  1423. if strings.Contains(c.Body, objDataPlaceholder) || strings.Contains(c.Body, objDataPlaceholderString) {
  1424. addObjectData = true
  1425. }
  1426. }
  1427. replacements := params.getStringReplacements(addObjectData, false)
  1428. replacer := strings.NewReplacer(replacements...)
  1429. body := replaceWithReplacer(c.Body, replacer)
  1430. subject := replaceWithReplacer(c.Subject, replacer)
  1431. recipients := getEmailAddressesWithReplacer(c.Recipients, replacer)
  1432. bcc := getEmailAddressesWithReplacer(c.Bcc, replacer)
  1433. startTime := time.Now()
  1434. var files []*mail.File
  1435. fileAttachments := make([]string, 0, len(c.Attachments))
  1436. for _, attachment := range c.Attachments {
  1437. if attachment == dataprovider.RetentionReportPlaceHolder {
  1438. f, err := params.getRetentionReportsAsMailAttachment()
  1439. if err != nil {
  1440. return err
  1441. }
  1442. files = append(files, f)
  1443. continue
  1444. }
  1445. fileAttachments = append(fileAttachments, attachment)
  1446. }
  1447. if len(fileAttachments) > 0 {
  1448. user, err := params.getUserFromSender()
  1449. if err != nil {
  1450. return err
  1451. }
  1452. user, err = getUserForEventAction(user)
  1453. if err != nil {
  1454. return err
  1455. }
  1456. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1457. err = user.CheckFsRoot(connectionID)
  1458. defer user.CloseFs() //nolint:errcheck
  1459. if err != nil {
  1460. return fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  1461. }
  1462. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1463. res, err := getMailAttachments(conn, fileAttachments, replacer)
  1464. if err != nil {
  1465. return err
  1466. }
  1467. files = append(files, res...)
  1468. }
  1469. err := smtp.SendEmail(recipients, bcc, subject, body, smtp.EmailContentType(c.ContentType), files...)
  1470. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1471. time.Since(startTime), err)
  1472. if err != nil {
  1473. return fmt.Errorf("unable to send email: %w", err)
  1474. }
  1475. return nil
  1476. }
  1477. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1478. err := user.LoadAndApplyGroupSettings()
  1479. if err != nil {
  1480. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1481. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1482. }
  1483. user.UploadDataTransfer = 0
  1484. user.UploadBandwidth = 0
  1485. user.DownloadBandwidth = 0
  1486. user.Filters.DisableFsChecks = false
  1487. user.Filters.FilePatterns = nil
  1488. user.Filters.BandwidthLimits = nil
  1489. for k := range user.Permissions {
  1490. user.Permissions[k] = []string{dataprovider.PermAny}
  1491. }
  1492. return user, nil
  1493. }
  1494. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1495. results := make([]string, 0, len(paths))
  1496. for _, p := range paths {
  1497. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1498. }
  1499. return util.RemoveDuplicates(results, false)
  1500. }
  1501. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1502. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1503. if err != nil {
  1504. return err
  1505. }
  1506. return conn.RemoveFile(fs, fsPath, item, info)
  1507. }
  1508. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1509. user, err := getUserForEventAction(user)
  1510. if err != nil {
  1511. return err
  1512. }
  1513. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1514. err = user.CheckFsRoot(connectionID)
  1515. defer user.CloseFs() //nolint:errcheck
  1516. if err != nil {
  1517. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1518. }
  1519. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1520. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1521. info, err := conn.DoStat(item, 0, false)
  1522. if err != nil {
  1523. if conn.IsNotExistError(err) {
  1524. continue
  1525. }
  1526. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1527. }
  1528. if info.IsDir() {
  1529. if err = conn.RemoveDir(item); err != nil {
  1530. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1531. }
  1532. } else {
  1533. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1534. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1535. }
  1536. }
  1537. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1538. }
  1539. return nil
  1540. }
  1541. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1542. conditions dataprovider.ConditionOptions, params *EventParams,
  1543. ) error {
  1544. users, err := params.getUsers()
  1545. if err != nil {
  1546. return fmt.Errorf("unable to get users: %w", err)
  1547. }
  1548. var failures []string
  1549. executed := 0
  1550. for _, user := range users {
  1551. // if sender is set, the conditions have already been evaluated
  1552. if params.sender == "" {
  1553. if !checkUserConditionOptions(&user, &conditions) {
  1554. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1555. user.Username)
  1556. continue
  1557. }
  1558. }
  1559. executed++
  1560. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1561. params.AddError(err)
  1562. failures = append(failures, user.Username)
  1563. }
  1564. }
  1565. if len(failures) > 0 {
  1566. return fmt.Errorf("fs delete failed for users: %s", strings.Join(failures, ", "))
  1567. }
  1568. if executed == 0 {
  1569. eventManagerLog(logger.LevelError, "no delete executed")
  1570. return errors.New("no delete executed")
  1571. }
  1572. return nil
  1573. }
  1574. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1575. user, err := getUserForEventAction(user)
  1576. if err != nil {
  1577. return err
  1578. }
  1579. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1580. err = user.CheckFsRoot(connectionID)
  1581. defer user.CloseFs() //nolint:errcheck
  1582. if err != nil {
  1583. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1584. }
  1585. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1586. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1587. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1588. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1589. }
  1590. if err = conn.createDirIfMissing(item); err != nil {
  1591. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1592. }
  1593. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1594. }
  1595. return nil
  1596. }
  1597. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1598. conditions dataprovider.ConditionOptions, params *EventParams,
  1599. ) error {
  1600. users, err := params.getUsers()
  1601. if err != nil {
  1602. return fmt.Errorf("unable to get users: %w", err)
  1603. }
  1604. var failures []string
  1605. executed := 0
  1606. for _, user := range users {
  1607. // if sender is set, the conditions have already been evaluated
  1608. if params.sender == "" {
  1609. if !checkUserConditionOptions(&user, &conditions) {
  1610. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1611. user.Username)
  1612. continue
  1613. }
  1614. }
  1615. executed++
  1616. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1617. failures = append(failures, user.Username)
  1618. }
  1619. }
  1620. if len(failures) > 0 {
  1621. return fmt.Errorf("fs mkdir failed for users: %s", strings.Join(failures, ", "))
  1622. }
  1623. if executed == 0 {
  1624. eventManagerLog(logger.LevelError, "no mkdir executed")
  1625. return errors.New("no mkdir executed")
  1626. }
  1627. return nil
  1628. }
  1629. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1630. user dataprovider.User,
  1631. ) error {
  1632. user, err := getUserForEventAction(user)
  1633. if err != nil {
  1634. return err
  1635. }
  1636. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1637. err = user.CheckFsRoot(connectionID)
  1638. defer user.CloseFs() //nolint:errcheck
  1639. if err != nil {
  1640. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1641. }
  1642. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1643. for _, item := range renames {
  1644. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1645. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1646. if err = conn.renameInternal(source, target, true); err != nil {
  1647. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1648. }
  1649. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1650. }
  1651. return nil
  1652. }
  1653. func executeCopyFsActionForUser(keyVals []dataprovider.KeyValue, replacer *strings.Replacer,
  1654. user dataprovider.User,
  1655. ) error {
  1656. user, err := getUserForEventAction(user)
  1657. if err != nil {
  1658. return err
  1659. }
  1660. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1661. err = user.CheckFsRoot(connectionID)
  1662. defer user.CloseFs() //nolint:errcheck
  1663. if err != nil {
  1664. return fmt.Errorf("copy error, unable to check root fs for user %q: %w", user.Username, err)
  1665. }
  1666. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1667. for _, item := range keyVals {
  1668. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1669. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1670. if strings.HasSuffix(item.Key, "/") {
  1671. source += "/"
  1672. }
  1673. if strings.HasSuffix(item.Value, "/") {
  1674. target += "/"
  1675. }
  1676. if err = conn.Copy(source, target); err != nil {
  1677. return fmt.Errorf("unable to copy %q->%q, user %q: %w", source, target, user.Username, err)
  1678. }
  1679. eventManagerLog(logger.LevelDebug, "copy %q->%q ok, user %q", source, target, user.Username)
  1680. }
  1681. return nil
  1682. }
  1683. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1684. user dataprovider.User,
  1685. ) error {
  1686. user, err := getUserForEventAction(user)
  1687. if err != nil {
  1688. return err
  1689. }
  1690. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1691. err = user.CheckFsRoot(connectionID)
  1692. defer user.CloseFs() //nolint:errcheck
  1693. if err != nil {
  1694. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1695. }
  1696. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1697. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1698. if _, err = conn.DoStat(item, 0, false); err != nil {
  1699. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1700. }
  1701. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1702. }
  1703. return nil
  1704. }
  1705. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1706. conditions dataprovider.ConditionOptions, params *EventParams,
  1707. ) error {
  1708. users, err := params.getUsers()
  1709. if err != nil {
  1710. return fmt.Errorf("unable to get users: %w", err)
  1711. }
  1712. var failures []string
  1713. executed := 0
  1714. for _, user := range users {
  1715. // if sender is set, the conditions have already been evaluated
  1716. if params.sender == "" {
  1717. if !checkUserConditionOptions(&user, &conditions) {
  1718. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1719. user.Username)
  1720. continue
  1721. }
  1722. }
  1723. executed++
  1724. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1725. failures = append(failures, user.Username)
  1726. params.AddError(err)
  1727. }
  1728. }
  1729. if len(failures) > 0 {
  1730. return fmt.Errorf("fs rename failed for users: %s", strings.Join(failures, ", "))
  1731. }
  1732. if executed == 0 {
  1733. eventManagerLog(logger.LevelError, "no rename executed")
  1734. return errors.New("no rename executed")
  1735. }
  1736. return nil
  1737. }
  1738. func executeCopyFsRuleAction(keyVals []dataprovider.KeyValue, replacer *strings.Replacer,
  1739. conditions dataprovider.ConditionOptions, params *EventParams,
  1740. ) error {
  1741. users, err := params.getUsers()
  1742. if err != nil {
  1743. return fmt.Errorf("unable to get users: %w", err)
  1744. }
  1745. var failures []string
  1746. var executed int
  1747. for _, user := range users {
  1748. // if sender is set, the conditions have already been evaluated
  1749. if params.sender == "" {
  1750. if !checkUserConditionOptions(&user, &conditions) {
  1751. eventManagerLog(logger.LevelDebug, "skipping fs copy for user %s, condition options don't match",
  1752. user.Username)
  1753. continue
  1754. }
  1755. }
  1756. executed++
  1757. if err = executeCopyFsActionForUser(keyVals, replacer, user); err != nil {
  1758. failures = append(failures, user.Username)
  1759. params.AddError(err)
  1760. }
  1761. }
  1762. if len(failures) > 0 {
  1763. return fmt.Errorf("fs copy failed for users: %s", strings.Join(failures, ", "))
  1764. }
  1765. if executed == 0 {
  1766. eventManagerLog(logger.LevelError, "no copy executed")
  1767. return errors.New("no copy executed")
  1768. }
  1769. return nil
  1770. }
  1771. func getArchiveBaseDir(paths []string) string {
  1772. var parentDirs []string
  1773. for _, p := range paths {
  1774. parentDirs = append(parentDirs, path.Dir(p))
  1775. }
  1776. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1777. baseDir := "/"
  1778. if len(parentDirs) == 1 {
  1779. baseDir = parentDirs[0]
  1780. }
  1781. return baseDir
  1782. }
  1783. func getSizeForPath(conn *BaseConnection, p string, info os.FileInfo) (int64, error) {
  1784. if info.IsDir() {
  1785. var dirSize int64
  1786. lister, err := conn.ListDir(p)
  1787. if err != nil {
  1788. return 0, err
  1789. }
  1790. defer lister.Close()
  1791. for {
  1792. entries, err := lister.Next(vfs.ListerBatchSize)
  1793. finished := errors.Is(err, io.EOF)
  1794. if err != nil && !finished {
  1795. return 0, err
  1796. }
  1797. for _, entry := range entries {
  1798. size, err := getSizeForPath(conn, path.Join(p, entry.Name()), entry)
  1799. if err != nil {
  1800. return 0, err
  1801. }
  1802. dirSize += size
  1803. }
  1804. if finished {
  1805. return dirSize, nil
  1806. }
  1807. }
  1808. }
  1809. if info.Mode().IsRegular() {
  1810. return info.Size(), nil
  1811. }
  1812. return 0, nil
  1813. }
  1814. func estimateZipSize(conn *BaseConnection, zipPath string, paths []string) (int64, error) {
  1815. q, _ := conn.HasSpace(false, false, zipPath)
  1816. if q.HasSpace && q.GetRemainingSize() > 0 {
  1817. var size int64
  1818. for _, item := range paths {
  1819. info, err := conn.DoStat(item, 1, false)
  1820. if err != nil {
  1821. return size, err
  1822. }
  1823. itemSize, err := getSizeForPath(conn, item, info)
  1824. if err != nil {
  1825. return size, err
  1826. }
  1827. size += itemSize
  1828. }
  1829. eventManagerLog(logger.LevelDebug, "archive paths %v, archive name %q, size: %d", paths, zipPath, size)
  1830. // we assume the zip size will be half of the real size
  1831. return size / 2, nil
  1832. }
  1833. return -1, nil
  1834. }
  1835. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1836. user dataprovider.User,
  1837. ) error {
  1838. user, err := getUserForEventAction(user)
  1839. if err != nil {
  1840. return err
  1841. }
  1842. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1843. err = user.CheckFsRoot(connectionID)
  1844. defer user.CloseFs() //nolint:errcheck
  1845. if err != nil {
  1846. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1847. }
  1848. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1849. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1850. conn.CheckParentDirs(path.Dir(name)) //nolint:errcheck
  1851. paths := make([]string, 0, len(c.Paths))
  1852. for idx := range c.Paths {
  1853. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1854. if p == name {
  1855. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1856. }
  1857. paths = append(paths, p)
  1858. }
  1859. paths = util.RemoveDuplicates(paths, false)
  1860. estimatedSize, err := estimateZipSize(conn, name, paths)
  1861. if err != nil {
  1862. eventManagerLog(logger.LevelError, "unable to estimate size for archive %q: %v", name, err)
  1863. return fmt.Errorf("unable to estimate archive size: %w", err)
  1864. }
  1865. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name, estimatedSize)
  1866. if err != nil {
  1867. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1868. return fmt.Errorf("unable to create archive: %w", err)
  1869. }
  1870. defer cancelFn()
  1871. baseDir := getArchiveBaseDir(paths)
  1872. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1873. zipWriter := &zipWriterWrapper{
  1874. Name: name,
  1875. Writer: zip.NewWriter(writer),
  1876. Entries: make(map[string]bool),
  1877. }
  1878. startTime := time.Now()
  1879. for _, item := range paths {
  1880. if err := addZipEntry(zipWriter, conn, item, baseDir, 0); err != nil {
  1881. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1882. return err
  1883. }
  1884. }
  1885. if err := zipWriter.Writer.Close(); err != nil {
  1886. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1887. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1888. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1889. }
  1890. return closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime)
  1891. }
  1892. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1893. params *EventParams,
  1894. ) error {
  1895. users, err := params.getUsers()
  1896. if err != nil {
  1897. return fmt.Errorf("unable to get users: %w", err)
  1898. }
  1899. var failures []string
  1900. executed := 0
  1901. for _, user := range users {
  1902. // if sender is set, the conditions have already been evaluated
  1903. if params.sender == "" {
  1904. if !checkUserConditionOptions(&user, &conditions) {
  1905. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1906. user.Username)
  1907. continue
  1908. }
  1909. }
  1910. executed++
  1911. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1912. failures = append(failures, user.Username)
  1913. params.AddError(err)
  1914. }
  1915. }
  1916. if len(failures) > 0 {
  1917. return fmt.Errorf("fs existence check failed for users: %s", strings.Join(failures, ", "))
  1918. }
  1919. if executed == 0 {
  1920. eventManagerLog(logger.LevelError, "no existence check executed")
  1921. return errors.New("no existence check executed")
  1922. }
  1923. return nil
  1924. }
  1925. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1926. conditions dataprovider.ConditionOptions, params *EventParams,
  1927. ) error {
  1928. users, err := params.getUsers()
  1929. if err != nil {
  1930. return fmt.Errorf("unable to get users: %w", err)
  1931. }
  1932. var failures []string
  1933. executed := 0
  1934. for _, user := range users {
  1935. // if sender is set, the conditions have already been evaluated
  1936. if params.sender == "" {
  1937. if !checkUserConditionOptions(&user, &conditions) {
  1938. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1939. user.Username)
  1940. continue
  1941. }
  1942. }
  1943. executed++
  1944. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1945. failures = append(failures, user.Username)
  1946. params.AddError(err)
  1947. }
  1948. }
  1949. if len(failures) > 0 {
  1950. return fmt.Errorf("fs compress failed for users: %s", strings.Join(failures, ","))
  1951. }
  1952. if executed == 0 {
  1953. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1954. return errors.New("no file/folder compressed")
  1955. }
  1956. return nil
  1957. }
  1958. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1959. params *EventParams,
  1960. ) error {
  1961. addObjectData := false
  1962. replacements := params.getStringReplacements(addObjectData, false)
  1963. replacer := strings.NewReplacer(replacements...)
  1964. switch c.Type {
  1965. case dataprovider.FilesystemActionRename:
  1966. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1967. case dataprovider.FilesystemActionDelete:
  1968. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1969. case dataprovider.FilesystemActionMkdirs:
  1970. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1971. case dataprovider.FilesystemActionExist:
  1972. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1973. case dataprovider.FilesystemActionCompress:
  1974. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1975. case dataprovider.FilesystemActionCopy:
  1976. return executeCopyFsRuleAction(c.Copy, replacer, conditions, params)
  1977. default:
  1978. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1979. }
  1980. }
  1981. func executeQuotaResetForUser(user *dataprovider.User) error {
  1982. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1983. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1984. user.Username, err)
  1985. return err
  1986. }
  1987. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  1988. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1989. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1990. }
  1991. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1992. numFiles, size, err := user.ScanQuota()
  1993. if err != nil {
  1994. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1995. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1996. }
  1997. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  1998. if err != nil {
  1999. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  2000. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  2001. }
  2002. return nil
  2003. }
  2004. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2005. users, err := params.getUsers()
  2006. if err != nil {
  2007. return fmt.Errorf("unable to get users: %w", err)
  2008. }
  2009. var failures []string
  2010. executed := 0
  2011. for _, user := range users {
  2012. // if sender is set, the conditions have already been evaluated
  2013. if params.sender == "" {
  2014. if !checkUserConditionOptions(&user, &conditions) {
  2015. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  2016. user.Username)
  2017. continue
  2018. }
  2019. }
  2020. executed++
  2021. if err = executeQuotaResetForUser(&user); err != nil {
  2022. params.AddError(err)
  2023. failures = append(failures, user.Username)
  2024. }
  2025. }
  2026. if len(failures) > 0 {
  2027. return fmt.Errorf("quota reset failed for users: %s", strings.Join(failures, ", "))
  2028. }
  2029. if executed == 0 {
  2030. eventManagerLog(logger.LevelError, "no user quota reset executed")
  2031. return errors.New("no user quota reset executed")
  2032. }
  2033. return nil
  2034. }
  2035. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2036. folders, err := params.getFolders()
  2037. if err != nil {
  2038. return fmt.Errorf("unable to get folders: %w", err)
  2039. }
  2040. var failures []string
  2041. executed := 0
  2042. for _, folder := range folders {
  2043. // if sender is set, the conditions have already been evaluated
  2044. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  2045. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  2046. folder.Name)
  2047. continue
  2048. }
  2049. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  2050. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  2051. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  2052. failures = append(failures, folder.Name)
  2053. continue
  2054. }
  2055. executed++
  2056. f := vfs.VirtualFolder{
  2057. BaseVirtualFolder: folder,
  2058. VirtualPath: "/",
  2059. }
  2060. numFiles, size, err := f.ScanQuota()
  2061. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  2062. if err != nil {
  2063. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  2064. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  2065. failures = append(failures, folder.Name)
  2066. continue
  2067. }
  2068. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  2069. if err != nil {
  2070. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  2071. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  2072. failures = append(failures, folder.Name)
  2073. }
  2074. }
  2075. if len(failures) > 0 {
  2076. return fmt.Errorf("quota reset failed for folders: %s", strings.Join(failures, ", "))
  2077. }
  2078. if executed == 0 {
  2079. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  2080. return errors.New("no folder quota reset executed")
  2081. }
  2082. return nil
  2083. }
  2084. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2085. users, err := params.getUsers()
  2086. if err != nil {
  2087. return fmt.Errorf("unable to get users: %w", err)
  2088. }
  2089. var failures []string
  2090. executed := 0
  2091. for _, user := range users {
  2092. // if sender is set, the conditions have already been evaluated
  2093. if params.sender == "" {
  2094. if !checkUserConditionOptions(&user, &conditions) {
  2095. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  2096. user.Username)
  2097. continue
  2098. }
  2099. }
  2100. executed++
  2101. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  2102. if err != nil {
  2103. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  2104. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  2105. failures = append(failures, user.Username)
  2106. }
  2107. }
  2108. if len(failures) > 0 {
  2109. return fmt.Errorf("transfer quota reset failed for users: %s", strings.Join(failures, ", "))
  2110. }
  2111. if executed == 0 {
  2112. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  2113. return errors.New("no transfer quota reset executed")
  2114. }
  2115. return nil
  2116. }
  2117. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  2118. params *EventParams, actionName string,
  2119. ) error {
  2120. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2121. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  2122. user.Username, err)
  2123. return err
  2124. }
  2125. check := RetentionCheck{
  2126. Folders: folders,
  2127. }
  2128. c := RetentionChecks.Add(check, &user)
  2129. if c == nil {
  2130. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  2131. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  2132. }
  2133. defer func() {
  2134. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  2135. Username: user.Username,
  2136. ActionName: actionName,
  2137. Results: c.results,
  2138. })
  2139. }()
  2140. if err := c.Start(); err != nil {
  2141. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  2142. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  2143. }
  2144. return nil
  2145. }
  2146. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  2147. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  2148. ) error {
  2149. users, err := params.getUsers()
  2150. if err != nil {
  2151. return fmt.Errorf("unable to get users: %w", err)
  2152. }
  2153. var failures []string
  2154. executed := 0
  2155. for _, user := range users {
  2156. // if sender is set, the conditions have already been evaluated
  2157. if params.sender == "" {
  2158. if !checkUserConditionOptions(&user, &conditions) {
  2159. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  2160. user.Username)
  2161. continue
  2162. }
  2163. }
  2164. executed++
  2165. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  2166. failures = append(failures, user.Username)
  2167. params.AddError(err)
  2168. }
  2169. }
  2170. if len(failures) > 0 {
  2171. return fmt.Errorf("retention check failed for users: %s", strings.Join(failures, ", "))
  2172. }
  2173. if executed == 0 {
  2174. eventManagerLog(logger.LevelError, "no retention check executed")
  2175. return errors.New("no retention check executed")
  2176. }
  2177. return nil
  2178. }
  2179. func executeUserExpirationCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2180. users, err := params.getUsers()
  2181. if err != nil {
  2182. return fmt.Errorf("unable to get users: %w", err)
  2183. }
  2184. var failures []string
  2185. var executed int
  2186. for _, user := range users {
  2187. // if sender is set, the conditions have already been evaluated
  2188. if params.sender == "" {
  2189. if !checkUserConditionOptions(&user, &conditions) {
  2190. eventManagerLog(logger.LevelDebug, "skipping expiration check for user %q, condition options don't match",
  2191. user.Username)
  2192. continue
  2193. }
  2194. }
  2195. executed++
  2196. if user.ExpirationDate > 0 {
  2197. expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate)
  2198. if expDate.Before(time.Now()) {
  2199. failures = append(failures, user.Username)
  2200. }
  2201. }
  2202. }
  2203. if len(failures) > 0 {
  2204. return fmt.Errorf("expired users: %s", strings.Join(failures, ", "))
  2205. }
  2206. if executed == 0 {
  2207. eventManagerLog(logger.LevelError, "no user expiration check executed")
  2208. return errors.New("no user expiration check executed")
  2209. }
  2210. return nil
  2211. }
  2212. func executeInactivityCheckForUser(user *dataprovider.User, config dataprovider.EventActionUserInactivity, when time.Time) error {
  2213. if config.DeleteThreshold > 0 && (user.Status == 0 || config.DisableThreshold == 0) {
  2214. if inactivityDays := user.InactivityDays(when); inactivityDays > config.DeleteThreshold {
  2215. err := dataprovider.DeleteUser(user.Username, dataprovider.ActionExecutorSystem, "", "")
  2216. eventManagerLog(logger.LevelInfo, "deleting inactive user %q, days of inactivity: %d/%d, err: %v",
  2217. user.Username, inactivityDays, config.DeleteThreshold, err)
  2218. if err != nil {
  2219. return fmt.Errorf("unable to delete inactive user %q", user.Username)
  2220. }
  2221. return fmt.Errorf("inactive user %q deleted. Number of days of inactivity: %d", user.Username, inactivityDays)
  2222. }
  2223. }
  2224. if config.DisableThreshold > 0 && user.Status > 0 {
  2225. if inactivityDays := user.InactivityDays(when); inactivityDays > config.DisableThreshold {
  2226. user.Status = 0
  2227. err := dataprovider.UpdateUser(user, dataprovider.ActionExecutorSystem, "", "")
  2228. eventManagerLog(logger.LevelInfo, "disabling inactive user %q, days of inactivity: %d/%d, err: %v",
  2229. user.Username, inactivityDays, config.DisableThreshold, err)
  2230. if err != nil {
  2231. return fmt.Errorf("unable to disable inactive user %q", user.Username)
  2232. }
  2233. return fmt.Errorf("inactive user %q disabled. Number of days of inactivity: %d", user.Username, inactivityDays)
  2234. }
  2235. }
  2236. return nil
  2237. }
  2238. func executeUserInactivityCheckRuleAction(config dataprovider.EventActionUserInactivity,
  2239. conditions dataprovider.ConditionOptions,
  2240. params *EventParams,
  2241. when time.Time,
  2242. ) error {
  2243. users, err := params.getUsers()
  2244. if err != nil {
  2245. return fmt.Errorf("unable to get users: %w", err)
  2246. }
  2247. var failures []string
  2248. for _, user := range users {
  2249. // if sender is set, the conditions have already been evaluated
  2250. if params.sender == "" {
  2251. if !checkUserConditionOptions(&user, &conditions) {
  2252. eventManagerLog(logger.LevelDebug, "skipping inactivity check for user %q, condition options don't match",
  2253. user.Username)
  2254. continue
  2255. }
  2256. }
  2257. if err = executeInactivityCheckForUser(&user, config, when); err != nil {
  2258. params.AddError(err)
  2259. failures = append(failures, user.Username)
  2260. }
  2261. }
  2262. if len(failures) > 0 {
  2263. return fmt.Errorf("executed inactivity check actions for users: %s", strings.Join(failures, ", "))
  2264. }
  2265. return nil
  2266. }
  2267. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  2268. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2269. eventManagerLog(logger.LevelError, "skipping password expiration check for user %q, cannot apply group settings: %v",
  2270. user.Username, err)
  2271. return err
  2272. }
  2273. if user.ExpirationDate > 0 {
  2274. if expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate); expDate.Before(time.Now()) {
  2275. eventManagerLog(logger.LevelDebug, "skipping password expiration check for expired user %q, expiration date: %s",
  2276. user.Username, expDate)
  2277. return nil
  2278. }
  2279. }
  2280. if user.Filters.PasswordExpiration == 0 {
  2281. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  2282. return nil
  2283. }
  2284. days := user.PasswordExpiresIn()
  2285. if days > config.Threshold {
  2286. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  2287. user.Username, days, config.Threshold)
  2288. return nil
  2289. }
  2290. body := new(bytes.Buffer)
  2291. data := make(map[string]any)
  2292. data["Username"] = user.Username
  2293. data["Days"] = days
  2294. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  2295. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  2296. user.Username, err)
  2297. return err
  2298. }
  2299. subject := "SFTPGo password expiration notification"
  2300. startTime := time.Now()
  2301. if err := smtp.SendEmail([]string{user.Email}, nil, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  2302. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  2303. user.Username, err, time.Since(startTime))
  2304. return err
  2305. }
  2306. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  2307. user.Username, days, time.Since(startTime))
  2308. return nil
  2309. }
  2310. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  2311. params *EventParams) error {
  2312. users, err := params.getUsers()
  2313. if err != nil {
  2314. return fmt.Errorf("unable to get users: %w", err)
  2315. }
  2316. var failures []string
  2317. for _, user := range users {
  2318. // if sender is set, the conditions have already been evaluated
  2319. if params.sender == "" {
  2320. if !checkUserConditionOptions(&user, &conditions) {
  2321. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  2322. user.Username)
  2323. continue
  2324. }
  2325. }
  2326. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  2327. params.AddError(err)
  2328. failures = append(failures, user.Username)
  2329. }
  2330. }
  2331. if len(failures) > 0 {
  2332. return fmt.Errorf("password expiration check failed for users: %s", strings.Join(failures, ", "))
  2333. }
  2334. return nil
  2335. }
  2336. func executeAdminCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.Admin, error) {
  2337. admin, err := dataprovider.AdminExists(params.Name)
  2338. exists := err == nil
  2339. if exists && c.Mode == 1 {
  2340. return &admin, nil
  2341. }
  2342. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2343. return nil, err
  2344. }
  2345. replacements := params.getStringReplacements(false, true)
  2346. replacer := strings.NewReplacer(replacements...)
  2347. data := replaceWithReplacer(c.TemplateAdmin, replacer)
  2348. var newAdmin dataprovider.Admin
  2349. err = json.Unmarshal(util.StringToBytes(data), &newAdmin)
  2350. if err != nil {
  2351. return nil, err
  2352. }
  2353. if exists {
  2354. eventManagerLog(logger.LevelDebug, "updating admin %q after IDP login", params.Name)
  2355. // Not sure if this makes sense, but it shouldn't hurt.
  2356. if newAdmin.Password == "" {
  2357. newAdmin.Password = admin.Password
  2358. }
  2359. newAdmin.Filters.TOTPConfig = admin.Filters.TOTPConfig
  2360. newAdmin.Filters.RecoveryCodes = admin.Filters.RecoveryCodes
  2361. err = dataprovider.UpdateAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2362. } else {
  2363. eventManagerLog(logger.LevelDebug, "creating admin %q after IDP login", params.Name)
  2364. if newAdmin.Password == "" {
  2365. newAdmin.Password = util.GenerateUniqueID()
  2366. }
  2367. err = dataprovider.AddAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2368. }
  2369. return &newAdmin, err
  2370. }
  2371. func preserveUserProfile(user, newUser *dataprovider.User) {
  2372. if newUser.CanChangePassword() && user.Password != "" {
  2373. newUser.Password = user.Password
  2374. }
  2375. if newUser.CanManagePublicKeys() && len(user.PublicKeys) > 0 {
  2376. newUser.PublicKeys = user.PublicKeys
  2377. }
  2378. if newUser.CanManageTLSCerts() {
  2379. if len(user.Filters.TLSCerts) > 0 {
  2380. newUser.Filters.TLSCerts = user.Filters.TLSCerts
  2381. }
  2382. }
  2383. if newUser.CanChangeInfo() {
  2384. if user.Description != "" {
  2385. newUser.Description = user.Description
  2386. }
  2387. if user.Email != "" {
  2388. newUser.Email = user.Email
  2389. }
  2390. }
  2391. if newUser.CanChangeAPIKeyAuth() {
  2392. newUser.Filters.AllowAPIKeyAuth = user.Filters.AllowAPIKeyAuth
  2393. }
  2394. newUser.Filters.RecoveryCodes = user.Filters.RecoveryCodes
  2395. newUser.Filters.TOTPConfig = user.Filters.TOTPConfig
  2396. newUser.LastPasswordChange = user.LastPasswordChange
  2397. newUser.SetEmptySecretsIfNil()
  2398. }
  2399. func executeUserCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.User, error) {
  2400. user, err := dataprovider.UserExists(params.Name, "")
  2401. exists := err == nil
  2402. if exists && c.Mode == 1 {
  2403. err = user.LoadAndApplyGroupSettings()
  2404. return &user, err
  2405. }
  2406. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2407. return nil, err
  2408. }
  2409. replacements := params.getStringReplacements(false, true)
  2410. replacer := strings.NewReplacer(replacements...)
  2411. data := replaceWithReplacer(c.TemplateUser, replacer)
  2412. var newUser dataprovider.User
  2413. err = json.Unmarshal(util.StringToBytes(data), &newUser)
  2414. if err != nil {
  2415. return nil, err
  2416. }
  2417. if exists {
  2418. eventManagerLog(logger.LevelDebug, "updating user %q after IDP login", params.Name)
  2419. preserveUserProfile(&user, &newUser)
  2420. err = dataprovider.UpdateUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2421. } else {
  2422. eventManagerLog(logger.LevelDebug, "creating user %q after IDP login", params.Name)
  2423. err = dataprovider.AddUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2424. }
  2425. if err != nil {
  2426. return nil, err
  2427. }
  2428. u, err := dataprovider.GetUserWithGroupSettings(params.Name, "")
  2429. return &u, err
  2430. }
  2431. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams, //nolint:gocyclo
  2432. conditions dataprovider.ConditionOptions,
  2433. ) error {
  2434. if len(conditions.EventStatuses) > 0 && !slices.Contains(conditions.EventStatuses, params.Status) {
  2435. eventManagerLog(logger.LevelDebug, "skipping action %s, event status %d does not match: %v",
  2436. action.Name, params.Status, conditions.EventStatuses)
  2437. return nil
  2438. }
  2439. var err error
  2440. switch action.Type {
  2441. case dataprovider.ActionTypeHTTP:
  2442. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  2443. case dataprovider.ActionTypeCommand:
  2444. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  2445. case dataprovider.ActionTypeEmail:
  2446. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  2447. case dataprovider.ActionTypeBackup:
  2448. var backupPath string
  2449. backupPath, err = dataprovider.ExecuteBackup()
  2450. if err == nil {
  2451. params.setBackupParams(backupPath)
  2452. }
  2453. case dataprovider.ActionTypeUserQuotaReset:
  2454. err = executeUsersQuotaResetRuleAction(conditions, params)
  2455. case dataprovider.ActionTypeFolderQuotaReset:
  2456. err = executeFoldersQuotaResetRuleAction(conditions, params)
  2457. case dataprovider.ActionTypeTransferQuotaReset:
  2458. err = executeTransferQuotaResetRuleAction(conditions, params)
  2459. case dataprovider.ActionTypeDataRetentionCheck:
  2460. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  2461. case dataprovider.ActionTypeFilesystem:
  2462. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  2463. case dataprovider.ActionTypePasswordExpirationCheck:
  2464. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  2465. case dataprovider.ActionTypeUserExpirationCheck:
  2466. err = executeUserExpirationCheckRuleAction(conditions, params)
  2467. case dataprovider.ActionTypeUserInactivityCheck:
  2468. err = executeUserInactivityCheckRuleAction(action.Options.UserInactivityConfig, conditions, params, time.Now())
  2469. case dataprovider.ActionTypeRotateLogs:
  2470. err = logger.RotateLogFile()
  2471. default:
  2472. err = fmt.Errorf("unsupported action type: %d", action.Type)
  2473. }
  2474. if err != nil {
  2475. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  2476. }
  2477. params.AddError(err)
  2478. return err
  2479. }
  2480. func executeIDPAccountCheckRule(rule dataprovider.EventRule, params EventParams) (*dataprovider.User,
  2481. *dataprovider.Admin, error,
  2482. ) {
  2483. for _, action := range rule.Actions {
  2484. if action.Type == dataprovider.ActionTypeIDPAccountCheck {
  2485. startTime := time.Now()
  2486. var user *dataprovider.User
  2487. var admin *dataprovider.Admin
  2488. var err error
  2489. var failedActions []string
  2490. paramsCopy := params.getACopy()
  2491. switch params.Event {
  2492. case IDPLoginAdmin:
  2493. admin, err = executeAdminCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2494. case IDPLoginUser:
  2495. user, err = executeUserCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2496. default:
  2497. err = fmt.Errorf("unsupported IDP login event: %q", params.Event)
  2498. }
  2499. if err != nil {
  2500. paramsCopy.AddError(fmt.Errorf("unable to handle %q: %w", params.Event, err))
  2501. eventManagerLog(logger.LevelError, "unable to handle IDP login event %q, err: %v", params.Event, err)
  2502. failedActions = append(failedActions, action.Name)
  2503. } else {
  2504. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2505. action.Name, rule.Name, time.Since(startTime))
  2506. }
  2507. // execute async actions if any, including failure actions
  2508. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2509. return user, admin, err
  2510. }
  2511. }
  2512. eventManagerLog(logger.LevelError, "no action executed for IDP login event %q, event rule: %q", params.Event, rule.Name)
  2513. return nil, nil, errors.New("no action executed")
  2514. }
  2515. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  2516. var errRes error
  2517. for _, rule := range rules {
  2518. var failedActions []string
  2519. paramsCopy := params.getACopy()
  2520. for _, action := range rule.Actions {
  2521. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  2522. startTime := time.Now()
  2523. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  2524. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  2525. action.Name, rule.Name, time.Since(startTime), err)
  2526. failedActions = append(failedActions, action.Name)
  2527. // we return the last error, it is ok for now
  2528. errRes = err
  2529. if action.Options.StopOnFailure {
  2530. break
  2531. }
  2532. } else {
  2533. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  2534. action.Name, rule.Name, time.Since(startTime))
  2535. }
  2536. }
  2537. }
  2538. // execute async actions if any, including failure actions
  2539. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2540. }
  2541. return errRes
  2542. }
  2543. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  2544. eventManager.addAsyncTask()
  2545. defer eventManager.removeAsyncTask()
  2546. params.addUID()
  2547. for _, rule := range rules {
  2548. executeRuleAsyncActions(rule, params.getACopy(), nil)
  2549. }
  2550. }
  2551. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  2552. for _, action := range rule.Actions {
  2553. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  2554. startTime := time.Now()
  2555. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2556. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  2557. action.Name, rule.Name, time.Since(startTime), err)
  2558. failedActions = append(failedActions, action.Name)
  2559. if action.Options.StopOnFailure {
  2560. break
  2561. }
  2562. } else {
  2563. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2564. action.Name, rule.Name, time.Since(startTime))
  2565. }
  2566. }
  2567. }
  2568. if len(failedActions) > 0 {
  2569. params.updateStatusFromError = false
  2570. // execute failure actions
  2571. for _, action := range rule.Actions {
  2572. if action.Options.IsFailureAction {
  2573. startTime := time.Now()
  2574. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2575. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  2576. action.Name, rule.Name, time.Since(startTime), err)
  2577. if action.Options.StopOnFailure {
  2578. break
  2579. }
  2580. } else {
  2581. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  2582. action.Name, rule.Name, time.Since(startTime))
  2583. }
  2584. }
  2585. }
  2586. }
  2587. }
  2588. type eventCronJob struct {
  2589. ruleName string
  2590. }
  2591. func (j *eventCronJob) getTask(rule *dataprovider.EventRule) (dataprovider.Task, error) {
  2592. if rule.GuardFromConcurrentExecution() {
  2593. task, err := dataprovider.GetTaskByName(rule.Name)
  2594. if err != nil {
  2595. if errors.Is(err, util.ErrNotFound) {
  2596. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  2597. task = dataprovider.Task{
  2598. Name: rule.Name,
  2599. UpdateAt: 0,
  2600. Version: 0,
  2601. }
  2602. err = dataprovider.AddTask(rule.Name)
  2603. if err != nil {
  2604. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  2605. return task, err
  2606. }
  2607. } else {
  2608. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  2609. }
  2610. }
  2611. return task, err
  2612. }
  2613. return dataprovider.Task{}, nil
  2614. }
  2615. func (j *eventCronJob) Run() {
  2616. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  2617. rule, err := dataprovider.EventRuleExists(j.ruleName)
  2618. if err != nil {
  2619. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  2620. return
  2621. }
  2622. if err := rule.CheckActionsConsistency(""); err != nil {
  2623. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  2624. return
  2625. }
  2626. task, err := j.getTask(&rule)
  2627. if err != nil {
  2628. return
  2629. }
  2630. if task.Name != "" {
  2631. updateInterval := 5 * time.Minute
  2632. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  2633. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  2634. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  2635. return
  2636. }
  2637. err = dataprovider.UpdateTask(rule.Name, task.Version)
  2638. if err != nil {
  2639. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  2640. rule.Name, err)
  2641. return
  2642. }
  2643. ticker := time.NewTicker(updateInterval)
  2644. done := make(chan bool)
  2645. defer func() {
  2646. done <- true
  2647. ticker.Stop()
  2648. }()
  2649. go func(taskName string) {
  2650. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2651. for {
  2652. select {
  2653. case <-done:
  2654. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2655. return
  2656. case <-ticker.C:
  2657. err := dataprovider.UpdateTaskTimestamp(taskName)
  2658. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2659. }
  2660. }
  2661. }(task.Name)
  2662. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2663. } else {
  2664. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2665. }
  2666. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2667. }
  2668. // RunOnDemandRule executes actions for a rule with on-demand trigger
  2669. func RunOnDemandRule(name string) error {
  2670. eventManagerLog(logger.LevelDebug, "executing on demand rule %q", name)
  2671. rule, err := dataprovider.EventRuleExists(name)
  2672. if err != nil {
  2673. eventManagerLog(logger.LevelDebug, "unable to load rule with name %q", name)
  2674. return util.NewRecordNotFoundError(fmt.Sprintf("rule %q does not exist", name))
  2675. }
  2676. if rule.Trigger != dataprovider.EventTriggerOnDemand {
  2677. eventManagerLog(logger.LevelDebug, "cannot run rule %q as on demand, trigger: %d", name, rule.Trigger)
  2678. return util.NewValidationError(fmt.Sprintf("rule %q is not defined as on-demand", name))
  2679. }
  2680. if rule.Status != 1 {
  2681. eventManagerLog(logger.LevelDebug, "on-demand rule %q is inactive", name)
  2682. return util.NewValidationError(fmt.Sprintf("rule %q is inactive", name))
  2683. }
  2684. if err := rule.CheckActionsConsistency(""); err != nil {
  2685. eventManagerLog(logger.LevelError, "on-demand rule %q has incompatible actions: %v", name, err)
  2686. return util.NewValidationError(fmt.Sprintf("rule %q has incosistent actions", name))
  2687. }
  2688. eventManagerLog(logger.LevelDebug, "on-demand rule %q started", name)
  2689. go executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2690. return nil
  2691. }
  2692. type zipWriterWrapper struct {
  2693. Name string
  2694. Entries map[string]bool
  2695. Writer *zip.Writer
  2696. }
  2697. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2698. logger.Log(level, "eventmanager", "", format, v...)
  2699. }