eventmanager.go 67 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "errors"
  20. "fmt"
  21. "io"
  22. "mime"
  23. "mime/multipart"
  24. "net/http"
  25. "net/textproto"
  26. "net/url"
  27. "os"
  28. "os/exec"
  29. "path"
  30. "strconv"
  31. "strings"
  32. "sync"
  33. "sync/atomic"
  34. "time"
  35. "github.com/klauspost/compress/zip"
  36. "github.com/robfig/cron/v3"
  37. "github.com/rs/xid"
  38. "github.com/sftpgo/sdk"
  39. mail "github.com/xhit/go-simple-mail/v2"
  40. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  41. "github.com/drakkan/sftpgo/v2/internal/logger"
  42. "github.com/drakkan/sftpgo/v2/internal/plugin"
  43. "github.com/drakkan/sftpgo/v2/internal/smtp"
  44. "github.com/drakkan/sftpgo/v2/internal/util"
  45. "github.com/drakkan/sftpgo/v2/internal/vfs"
  46. )
  47. const (
  48. ipBlockedEventName = "IP Blocked"
  49. maxAttachmentsSize = int64(10 * 1024 * 1024)
  50. )
  51. var (
  52. // eventManager handle the supported event rules actions
  53. eventManager eventRulesContainer
  54. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  55. )
  56. func init() {
  57. eventManager = eventRulesContainer{
  58. schedulesMapping: make(map[string][]cron.EntryID),
  59. // arbitrary maximum number of concurrent asynchronous tasks,
  60. // each task could execute multiple actions
  61. concurrencyGuard: make(chan struct{}, 200),
  62. }
  63. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  64. func(operation, executor, ip, objectType, objectName string, object plugin.Renderer) {
  65. eventManager.handleProviderEvent(EventParams{
  66. Name: executor,
  67. ObjectName: objectName,
  68. Event: operation,
  69. Status: 1,
  70. ObjectType: objectType,
  71. IP: ip,
  72. Timestamp: time.Now().UnixNano(),
  73. Object: object,
  74. })
  75. })
  76. }
  77. // HandleCertificateEvent checks and executes action rules for certificate events
  78. func HandleCertificateEvent(params EventParams) {
  79. eventManager.handleCertificateEvent(params)
  80. }
  81. // eventRulesContainer stores event rules by trigger
  82. type eventRulesContainer struct {
  83. sync.RWMutex
  84. lastLoad atomic.Int64
  85. FsEvents []dataprovider.EventRule
  86. ProviderEvents []dataprovider.EventRule
  87. Schedules []dataprovider.EventRule
  88. IPBlockedEvents []dataprovider.EventRule
  89. CertificateEvents []dataprovider.EventRule
  90. schedulesMapping map[string][]cron.EntryID
  91. concurrencyGuard chan struct{}
  92. }
  93. func (r *eventRulesContainer) addAsyncTask() {
  94. activeHooks.Add(1)
  95. r.concurrencyGuard <- struct{}{}
  96. }
  97. func (r *eventRulesContainer) removeAsyncTask() {
  98. activeHooks.Add(-1)
  99. <-r.concurrencyGuard
  100. }
  101. func (r *eventRulesContainer) getLastLoadTime() int64 {
  102. return r.lastLoad.Load()
  103. }
  104. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  105. r.lastLoad.Store(modTime)
  106. }
  107. // RemoveRule deletes the rule with the specified name
  108. func (r *eventRulesContainer) RemoveRule(name string) {
  109. r.Lock()
  110. defer r.Unlock()
  111. r.removeRuleInternal(name)
  112. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  113. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  114. }
  115. func (r *eventRulesContainer) removeRuleInternal(name string) {
  116. for idx := range r.FsEvents {
  117. if r.FsEvents[idx].Name == name {
  118. lastIdx := len(r.FsEvents) - 1
  119. r.FsEvents[idx] = r.FsEvents[lastIdx]
  120. r.FsEvents = r.FsEvents[:lastIdx]
  121. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  122. return
  123. }
  124. }
  125. for idx := range r.ProviderEvents {
  126. if r.ProviderEvents[idx].Name == name {
  127. lastIdx := len(r.ProviderEvents) - 1
  128. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  129. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  130. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  131. return
  132. }
  133. }
  134. for idx := range r.IPBlockedEvents {
  135. if r.IPBlockedEvents[idx].Name == name {
  136. lastIdx := len(r.IPBlockedEvents) - 1
  137. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  138. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  139. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  140. return
  141. }
  142. }
  143. for idx := range r.CertificateEvents {
  144. if r.CertificateEvents[idx].Name == name {
  145. lastIdx := len(r.CertificateEvents) - 1
  146. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  147. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  148. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  149. return
  150. }
  151. }
  152. for idx := range r.Schedules {
  153. if r.Schedules[idx].Name == name {
  154. if schedules, ok := r.schedulesMapping[name]; ok {
  155. for _, entryID := range schedules {
  156. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  157. eventScheduler.Remove(entryID)
  158. }
  159. delete(r.schedulesMapping, name)
  160. }
  161. lastIdx := len(r.Schedules) - 1
  162. r.Schedules[idx] = r.Schedules[lastIdx]
  163. r.Schedules = r.Schedules[:lastIdx]
  164. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  165. return
  166. }
  167. }
  168. }
  169. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  170. r.removeRuleInternal(rule.Name)
  171. if rule.DeletedAt > 0 {
  172. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  173. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  174. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  175. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  176. }
  177. return
  178. }
  179. switch rule.Trigger {
  180. case dataprovider.EventTriggerFsEvent:
  181. r.FsEvents = append(r.FsEvents, rule)
  182. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  183. case dataprovider.EventTriggerProviderEvent:
  184. r.ProviderEvents = append(r.ProviderEvents, rule)
  185. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  186. case dataprovider.EventTriggerIPBlocked:
  187. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  188. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  189. case dataprovider.EventTriggerCertificate:
  190. r.CertificateEvents = append(r.CertificateEvents, rule)
  191. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  192. case dataprovider.EventTriggerSchedule:
  193. for _, schedule := range rule.Conditions.Schedules {
  194. cronSpec := schedule.GetCronSpec()
  195. job := &eventCronJob{
  196. ruleName: dataprovider.ConvertName(rule.Name),
  197. }
  198. entryID, err := eventScheduler.AddJob(cronSpec, job)
  199. if err != nil {
  200. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  201. return
  202. }
  203. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  204. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  205. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  206. }
  207. r.Schedules = append(r.Schedules, rule)
  208. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  209. default:
  210. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  211. }
  212. }
  213. func (r *eventRulesContainer) loadRules() {
  214. eventManagerLog(logger.LevelDebug, "loading updated rules")
  215. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  216. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  217. if err != nil {
  218. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  219. return
  220. }
  221. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  222. if len(rules) > 0 {
  223. r.Lock()
  224. defer r.Unlock()
  225. for _, rule := range rules {
  226. r.addUpdateRuleInternal(rule)
  227. }
  228. }
  229. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d",
  230. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents))
  231. r.setLastLoadTime(modTime)
  232. }
  233. func (r *eventRulesContainer) checkProviderEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  234. if !util.Contains(conditions.ProviderEvents, params.Event) {
  235. return false
  236. }
  237. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  238. return false
  239. }
  240. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  241. return false
  242. }
  243. return true
  244. }
  245. func (r *eventRulesContainer) checkFsEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  246. if !util.Contains(conditions.FsEvents, params.Event) {
  247. return false
  248. }
  249. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  250. return false
  251. }
  252. if !checkEventGroupConditionPatters(params.Groups, conditions.Options.GroupNames) {
  253. return false
  254. }
  255. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  256. if !checkEventConditionPatterns(params.ObjectName, conditions.Options.FsPaths) {
  257. return false
  258. }
  259. }
  260. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  261. return false
  262. }
  263. if params.Event == operationUpload || params.Event == operationDownload {
  264. if conditions.Options.MinFileSize > 0 {
  265. if params.FileSize < conditions.Options.MinFileSize {
  266. return false
  267. }
  268. }
  269. if conditions.Options.MaxFileSize > 0 {
  270. if params.FileSize > conditions.Options.MaxFileSize {
  271. return false
  272. }
  273. }
  274. }
  275. return true
  276. }
  277. // hasFsRules returns true if there are any rules for filesystem event triggers
  278. func (r *eventRulesContainer) hasFsRules() bool {
  279. r.RLock()
  280. defer r.RUnlock()
  281. return len(r.FsEvents) > 0
  282. }
  283. // handleFsEvent executes the rules actions defined for the specified event
  284. func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
  285. if params.Protocol == protocolEventAction {
  286. return nil
  287. }
  288. r.RLock()
  289. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  290. for _, rule := range r.FsEvents {
  291. if r.checkFsEventMatch(rule.Conditions, params) {
  292. if err := rule.CheckActionsConsistency(""); err != nil {
  293. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  294. rule.Name, err, params.Event)
  295. continue
  296. }
  297. hasSyncActions := false
  298. for _, action := range rule.Actions {
  299. if action.Options.ExecuteSync {
  300. hasSyncActions = true
  301. break
  302. }
  303. }
  304. if hasSyncActions {
  305. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  306. } else {
  307. rulesAsync = append(rulesAsync, rule)
  308. }
  309. }
  310. }
  311. r.RUnlock()
  312. params.sender = params.Name
  313. if len(rulesAsync) > 0 {
  314. go executeAsyncRulesActions(rulesAsync, params)
  315. }
  316. if len(rulesWithSyncActions) > 0 {
  317. return executeSyncRulesActions(rulesWithSyncActions, params)
  318. }
  319. return nil
  320. }
  321. // username is populated for user objects
  322. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  323. r.RLock()
  324. defer r.RUnlock()
  325. var rules []dataprovider.EventRule
  326. for _, rule := range r.ProviderEvents {
  327. if r.checkProviderEventMatch(rule.Conditions, params) {
  328. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  329. rules = append(rules, rule)
  330. } else {
  331. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  332. rule.Name, err, params.Event, params.ObjectType)
  333. }
  334. }
  335. }
  336. if len(rules) > 0 {
  337. params.sender = params.ObjectName
  338. go executeAsyncRulesActions(rules, params)
  339. }
  340. }
  341. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  342. r.RLock()
  343. defer r.RUnlock()
  344. if len(r.IPBlockedEvents) == 0 {
  345. return
  346. }
  347. var rules []dataprovider.EventRule
  348. for _, rule := range r.IPBlockedEvents {
  349. if err := rule.CheckActionsConsistency(""); err == nil {
  350. rules = append(rules, rule)
  351. } else {
  352. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  353. rule.Name, err, params.Event)
  354. }
  355. }
  356. if len(rules) > 0 {
  357. go executeAsyncRulesActions(rules, params)
  358. }
  359. }
  360. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  361. r.RLock()
  362. defer r.RUnlock()
  363. if len(r.CertificateEvents) == 0 {
  364. return
  365. }
  366. var rules []dataprovider.EventRule
  367. for _, rule := range r.CertificateEvents {
  368. if err := rule.CheckActionsConsistency(""); err == nil {
  369. rules = append(rules, rule)
  370. } else {
  371. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  372. rule.Name, err, params.Event)
  373. }
  374. }
  375. if len(rules) > 0 {
  376. go executeAsyncRulesActions(rules, params)
  377. }
  378. }
  379. type executedRetentionCheck struct {
  380. Username string
  381. ActionName string
  382. Results []folderRetentionCheckResult
  383. }
  384. // EventParams defines the supported event parameters
  385. type EventParams struct {
  386. Name string
  387. Groups []sdk.GroupMapping
  388. Event string
  389. Status int
  390. VirtualPath string
  391. FsPath string
  392. VirtualTargetPath string
  393. FsTargetPath string
  394. ObjectName string
  395. ObjectType string
  396. FileSize int64
  397. Protocol string
  398. IP string
  399. Timestamp int64
  400. Object plugin.Renderer
  401. sender string
  402. updateStatusFromError bool
  403. errors []string
  404. retentionChecks []executedRetentionCheck
  405. }
  406. func (p *EventParams) getACopy() *EventParams {
  407. params := *p
  408. params.errors = make([]string, len(p.errors))
  409. copy(params.errors, p.errors)
  410. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  411. for _, c := range p.retentionChecks {
  412. executedCheck := executedRetentionCheck{
  413. Username: c.Username,
  414. ActionName: c.ActionName,
  415. }
  416. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  417. copy(executedCheck.Results, c.Results)
  418. retentionChecks = append(retentionChecks, executedCheck)
  419. }
  420. params.retentionChecks = retentionChecks
  421. return &params
  422. }
  423. // AddError adds a new error to the event params and update the status if needed
  424. func (p *EventParams) AddError(err error) {
  425. if err == nil {
  426. return
  427. }
  428. if p.updateStatusFromError && p.Status == 1 {
  429. p.Status = 2
  430. }
  431. p.errors = append(p.errors, err.Error())
  432. }
  433. func (p *EventParams) getStatusString() string {
  434. switch p.Status {
  435. case 1:
  436. return "OK"
  437. default:
  438. return "KO"
  439. }
  440. }
  441. // getUsers returns users with group settings not applied
  442. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  443. if p.sender == "" {
  444. users, err := dataprovider.DumpUsers()
  445. if err != nil {
  446. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  447. return users, errors.New("unable to get users")
  448. }
  449. return users, nil
  450. }
  451. user, err := p.getUserFromSender()
  452. if err != nil {
  453. return nil, err
  454. }
  455. return []dataprovider.User{user}, nil
  456. }
  457. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  458. user, err := dataprovider.UserExists(p.sender)
  459. if err != nil {
  460. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  461. return user, fmt.Errorf("error getting user %q", p.sender)
  462. }
  463. return user, nil
  464. }
  465. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  466. if p.sender == "" {
  467. return dataprovider.DumpFolders()
  468. }
  469. folder, err := dataprovider.GetFolderByName(p.sender)
  470. if err != nil {
  471. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  472. }
  473. return []vfs.BaseVirtualFolder{folder}, nil
  474. }
  475. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  476. if len(p.retentionChecks) == 0 {
  477. return nil, errors.New("no data retention report available")
  478. }
  479. var b bytes.Buffer
  480. wr := zip.NewWriter(&b)
  481. for _, check := range p.retentionChecks {
  482. if size := int64(len(b.Bytes())); size > maxAttachmentsSize {
  483. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s", util.ByteCountIEC(size))
  484. return nil, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(size))
  485. }
  486. data, err := getCSVRetentionReport(check.Results)
  487. if err != nil {
  488. return nil, fmt.Errorf("unable to get CSV report: %w", err)
  489. }
  490. fh := &zip.FileHeader{
  491. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  492. Method: zip.Deflate,
  493. Modified: time.Now().UTC(),
  494. }
  495. f, err := wr.CreateHeader(fh)
  496. if err != nil {
  497. return nil, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  498. }
  499. _, err = io.Copy(f, bytes.NewBuffer(data))
  500. if err != nil {
  501. return nil, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  502. }
  503. }
  504. if err := wr.Close(); err != nil {
  505. return nil, fmt.Errorf("unable to close zip writer: %w", err)
  506. }
  507. return b.Bytes(), nil
  508. }
  509. func (p *EventParams) getRetentionReportsAsMailAttachment() (mail.File, error) {
  510. var result mail.File
  511. data, err := p.getCompressedDataRetentionReport()
  512. if err != nil {
  513. return result, err
  514. }
  515. result.Name = "retention-reports.zip"
  516. result.Data = data
  517. return result, nil
  518. }
  519. func (p *EventParams) getStringReplacements(addObjectData bool) []string {
  520. replacements := []string{
  521. "{{Name}}", p.Name,
  522. "{{Event}}", p.Event,
  523. "{{Status}}", fmt.Sprintf("%d", p.Status),
  524. "{{VirtualPath}}", p.VirtualPath,
  525. "{{FsPath}}", p.FsPath,
  526. "{{VirtualTargetPath}}", p.VirtualTargetPath,
  527. "{{FsTargetPath}}", p.FsTargetPath,
  528. "{{ObjectName}}", p.ObjectName,
  529. "{{ObjectType}}", p.ObjectType,
  530. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  531. "{{Protocol}}", p.Protocol,
  532. "{{IP}}", p.IP,
  533. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  534. "{{StatusString}}", p.getStatusString(),
  535. }
  536. if len(p.errors) > 0 {
  537. replacements = append(replacements, "{{ErrorString}}", strings.Join(p.errors, ", "))
  538. } else {
  539. replacements = append(replacements, "{{ErrorString}}", "")
  540. }
  541. replacements = append(replacements, "{{ObjectData}}", "")
  542. if addObjectData {
  543. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  544. if err == nil {
  545. replacements[len(replacements)-1] = string(data)
  546. }
  547. }
  548. return replacements
  549. }
  550. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  551. var b bytes.Buffer
  552. csvWriter := csv.NewWriter(&b)
  553. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  554. "elapsed (ms)", "info", "error"})
  555. if err != nil {
  556. return nil, err
  557. }
  558. for _, result := range results {
  559. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  560. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  561. result.Info, result.Error})
  562. if err != nil {
  563. return nil, err
  564. }
  565. }
  566. csvWriter.Flush()
  567. err = csvWriter.Error()
  568. return b.Bytes(), err
  569. }
  570. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualPath string, numFiles int,
  571. truncatedSize int64, errTransfer error,
  572. ) error {
  573. errWrite := w.Close()
  574. info, err := conn.doStatInternal(virtualPath, 0, false, false)
  575. if err == nil {
  576. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, info.Size()-truncatedSize)
  577. _, fsPath, errFs := conn.GetFsAndResolvedPath(virtualPath)
  578. if errFs == nil {
  579. if errTransfer == nil {
  580. errTransfer = errWrite
  581. }
  582. ExecuteActionNotification(conn, operationUpload, fsPath, virtualPath, "", "", "", info.Size(), errTransfer) //nolint:errcheck
  583. }
  584. } else {
  585. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", virtualPath, err)
  586. }
  587. return errWrite
  588. }
  589. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  590. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  591. if err != nil {
  592. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  593. return
  594. }
  595. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  596. if vfolder.IsIncludedInUserQuota() {
  597. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  598. }
  599. }
  600. func getFileWriter(conn *BaseConnection, virtualPath string) (io.WriteCloser, int, int64, func(), error) {
  601. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  602. if err != nil {
  603. return nil, 0, 0, nil, err
  604. }
  605. var truncatedSize, fileSize int64
  606. numFiles := 1
  607. isFileOverwrite := false
  608. info, err := fs.Lstat(fsPath)
  609. if err == nil {
  610. fileSize = info.Size()
  611. if info.IsDir() {
  612. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  613. }
  614. if info.Mode().IsRegular() {
  615. isFileOverwrite = true
  616. truncatedSize = fileSize
  617. }
  618. numFiles = 0
  619. }
  620. if err != nil && !fs.IsNotExist(err) {
  621. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  622. }
  623. f, w, cancelFn, err := fs.Create(fsPath, 0)
  624. if err != nil {
  625. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  626. }
  627. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  628. if isFileOverwrite {
  629. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  630. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  631. truncatedSize = 0
  632. }
  633. }
  634. if cancelFn == nil {
  635. cancelFn = func() {}
  636. }
  637. if f != nil {
  638. return f, numFiles, truncatedSize, cancelFn, nil
  639. }
  640. return w, numFiles, truncatedSize, cancelFn, nil
  641. }
  642. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string) error {
  643. if entryPath == wr.Name {
  644. // skip the archive itself
  645. return nil
  646. }
  647. info, err := conn.DoStat(entryPath, 1, false)
  648. if err != nil {
  649. eventManagerLog(logger.LevelError, "unable to add zip entry %#v, stat error: %v", entryPath, err)
  650. return err
  651. }
  652. entryName, err := getZipEntryName(entryPath, baseDir)
  653. if err != nil {
  654. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  655. return err
  656. }
  657. if _, ok := wr.Entries[entryName]; ok {
  658. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  659. return nil
  660. }
  661. wr.Entries[entryName] = true
  662. if info.IsDir() {
  663. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  664. Name: entryName + "/",
  665. Method: zip.Deflate,
  666. Modified: info.ModTime(),
  667. })
  668. if err != nil {
  669. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  670. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  671. }
  672. contents, err := conn.ListDir(entryPath)
  673. if err != nil {
  674. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  675. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  676. }
  677. for _, info := range contents {
  678. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  679. if err := addZipEntry(wr, conn, fullPath, baseDir); err != nil {
  680. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  681. return err
  682. }
  683. }
  684. return nil
  685. }
  686. if !info.Mode().IsRegular() {
  687. // we only allow regular files
  688. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  689. return nil
  690. }
  691. reader, cancelFn, err := getFileReader(conn, entryPath)
  692. if err != nil {
  693. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  694. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  695. }
  696. defer cancelFn()
  697. defer reader.Close()
  698. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  699. Name: entryName,
  700. Method: zip.Deflate,
  701. Modified: info.ModTime(),
  702. })
  703. if err != nil {
  704. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  705. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  706. }
  707. _, err = io.Copy(f, reader)
  708. return err
  709. }
  710. func getZipEntryName(entryPath, baseDir string) (string, error) {
  711. if !strings.HasPrefix(entryPath, baseDir) {
  712. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  713. }
  714. entryPath = strings.TrimPrefix(entryPath, baseDir)
  715. return strings.TrimPrefix(entryPath, "/"), nil
  716. }
  717. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  718. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  719. if err != nil {
  720. return nil, nil, err
  721. }
  722. f, r, cancelFn, err := fs.Open(fsPath, 0)
  723. if err != nil {
  724. return nil, nil, conn.GetFsError(fs, err)
  725. }
  726. if cancelFn == nil {
  727. cancelFn = func() {}
  728. }
  729. if f != nil {
  730. return f, cancelFn, nil
  731. }
  732. return r, cancelFn, nil
  733. }
  734. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  735. reader, cancelFn, err := getFileReader(conn, virtualPath)
  736. if err != nil {
  737. return err
  738. }
  739. defer cancelFn()
  740. defer reader.Close()
  741. _, err = io.Copy(w, reader)
  742. return err
  743. }
  744. func getFileContent(conn *BaseConnection, virtualPath string, expectedSize int) ([]byte, error) {
  745. reader, cancelFn, err := getFileReader(conn, virtualPath)
  746. if err != nil {
  747. return nil, err
  748. }
  749. defer cancelFn()
  750. defer reader.Close()
  751. data := make([]byte, expectedSize)
  752. _, err = io.ReadFull(reader, data)
  753. return data, err
  754. }
  755. func getMailAttachments(user dataprovider.User, attachments []string, replacer *strings.Replacer) ([]mail.File, error) {
  756. var files []mail.File
  757. user, err := getUserForEventAction(user)
  758. if err != nil {
  759. return nil, err
  760. }
  761. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  762. err = user.CheckFsRoot(connectionID)
  763. defer user.CloseFs() //nolint:errcheck
  764. if err != nil {
  765. return nil, fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  766. }
  767. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  768. totalSize := int64(0)
  769. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  770. info, err := conn.DoStat(virtualPath, 0, false)
  771. if err != nil {
  772. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  773. }
  774. if !info.Mode().IsRegular() {
  775. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  776. }
  777. totalSize += info.Size()
  778. if totalSize > maxAttachmentsSize {
  779. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  780. }
  781. data, err := getFileContent(conn, virtualPath, int(info.Size()))
  782. if err != nil {
  783. return nil, fmt.Errorf("unable to get content for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  784. }
  785. files = append(files, mail.File{
  786. Name: path.Base(virtualPath),
  787. Data: data,
  788. })
  789. }
  790. return files, nil
  791. }
  792. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  793. if !strings.Contains(input, "{{") {
  794. return input
  795. }
  796. return replacer.Replace(input)
  797. }
  798. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  799. matched, err := path.Match(p.Pattern, name)
  800. if err != nil {
  801. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  802. return false
  803. }
  804. if p.InverseMatch {
  805. return !matched
  806. }
  807. return matched
  808. }
  809. // checkConditionPatterns returns false if patterns are defined and no match is found
  810. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  811. if len(patterns) == 0 {
  812. return true
  813. }
  814. for _, p := range patterns {
  815. if checkEventConditionPattern(p, name) {
  816. return true
  817. }
  818. }
  819. return false
  820. }
  821. func checkEventGroupConditionPatters(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  822. if len(patterns) == 0 {
  823. return true
  824. }
  825. for _, group := range groups {
  826. for _, p := range patterns {
  827. if checkEventConditionPattern(p, group.Name) {
  828. return true
  829. }
  830. }
  831. }
  832. return false
  833. }
  834. func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  835. if len(c.QueryParameters) > 0 {
  836. u, err := url.Parse(c.Endpoint)
  837. if err != nil {
  838. return "", fmt.Errorf("invalid endpoint: %w", err)
  839. }
  840. q := u.Query()
  841. for _, keyVal := range c.QueryParameters {
  842. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  843. }
  844. u.RawQuery = q.Encode()
  845. return u.String(), nil
  846. }
  847. return c.Endpoint, nil
  848. }
  849. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  850. conn *BaseConnection, replacer *strings.Replacer, params *EventParams,
  851. ) error {
  852. partWriter, err := m.CreatePart(h)
  853. if err != nil {
  854. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  855. return err
  856. }
  857. if part.Body != "" {
  858. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, replacer)))
  859. if err != nil {
  860. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  861. return err
  862. }
  863. return nil
  864. }
  865. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  866. data, err := params.getCompressedDataRetentionReport()
  867. if err != nil {
  868. return err
  869. }
  870. _, err = partWriter.Write(data)
  871. if err != nil {
  872. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  873. return err
  874. }
  875. return nil
  876. }
  877. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  878. if err != nil {
  879. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  880. return err
  881. }
  882. return nil
  883. }
  884. func getHTTPRuleActionBody(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  885. cancel context.CancelFunc, user dataprovider.User, params *EventParams,
  886. ) (io.ReadCloser, string, error) {
  887. var body io.ReadCloser
  888. if c.Method == http.MethodGet {
  889. return body, "", nil
  890. }
  891. if c.Body != "" {
  892. if c.Body == dataprovider.RetentionReportPlaceHolder {
  893. data, err := params.getCompressedDataRetentionReport()
  894. if err != nil {
  895. return body, "", err
  896. }
  897. return io.NopCloser(bytes.NewBuffer(data)), "", nil
  898. }
  899. return io.NopCloser(bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))), "", nil
  900. }
  901. if len(c.Parts) > 0 {
  902. r, w := io.Pipe()
  903. m := multipart.NewWriter(w)
  904. var conn *BaseConnection
  905. if user.Username != "" {
  906. var err error
  907. user, err = getUserForEventAction(user)
  908. if err != nil {
  909. return body, "", err
  910. }
  911. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  912. err = user.CheckFsRoot(connectionID)
  913. if err != nil {
  914. user.CloseFs() //nolint:errcheck
  915. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  916. user.Username, err)
  917. }
  918. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  919. }
  920. go func() {
  921. defer w.Close()
  922. defer user.CloseFs() //nolint:errcheck
  923. for _, part := range c.Parts {
  924. h := make(textproto.MIMEHeader)
  925. if part.Body != "" {
  926. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  927. } else {
  928. h.Set("Content-Disposition",
  929. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  930. multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(part.Filepath))))
  931. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  932. if contentType == "" {
  933. contentType = "application/octet-stream"
  934. }
  935. h.Set("Content-Type", contentType)
  936. }
  937. for _, keyVal := range part.Headers {
  938. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  939. }
  940. if err := writeHTTPPart(m, part, h, conn, replacer, params); err != nil {
  941. cancel()
  942. return
  943. }
  944. }
  945. m.Close()
  946. }()
  947. return r, m.FormDataContentType(), nil
  948. }
  949. return body, "", nil
  950. }
  951. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  952. if err := c.TryDecryptPassword(); err != nil {
  953. return err
  954. }
  955. addObjectData := false
  956. if params.Object != nil {
  957. addObjectData = c.HasObjectData()
  958. }
  959. replacements := params.getStringReplacements(addObjectData)
  960. replacer := strings.NewReplacer(replacements...)
  961. endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
  962. if err != nil {
  963. return err
  964. }
  965. ctx, cancel := c.GetContext()
  966. defer cancel()
  967. var user dataprovider.User
  968. if c.HasMultipartFiles() {
  969. user, err = params.getUserFromSender()
  970. if err != nil {
  971. return err
  972. }
  973. }
  974. body, contentType, err := getHTTPRuleActionBody(c, replacer, cancel, user, params)
  975. if err != nil {
  976. return err
  977. }
  978. if body != nil {
  979. defer body.Close()
  980. }
  981. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  982. if err != nil {
  983. return err
  984. }
  985. if contentType != "" {
  986. req.Header.Set("Content-Type", contentType)
  987. }
  988. if c.Username != "" {
  989. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  990. }
  991. for _, keyVal := range c.Headers {
  992. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  993. }
  994. client := c.GetHTTPClient()
  995. defer client.CloseIdleConnections()
  996. startTime := time.Now()
  997. resp, err := client.Do(req)
  998. if err != nil {
  999. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1000. endpoint, time.Since(startTime), err)
  1001. return fmt.Errorf("error sending HTTP request: %w", err)
  1002. }
  1003. defer resp.Body.Close()
  1004. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1005. endpoint, time.Since(startTime), resp.StatusCode)
  1006. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1007. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1008. }
  1009. return nil
  1010. }
  1011. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1012. addObjectData := false
  1013. if params.Object != nil {
  1014. for _, k := range c.EnvVars {
  1015. if strings.Contains(k.Value, "{{ObjectData}}") {
  1016. addObjectData = true
  1017. break
  1018. }
  1019. }
  1020. }
  1021. replacements := params.getStringReplacements(addObjectData)
  1022. replacer := strings.NewReplacer(replacements...)
  1023. args := make([]string, 0, len(c.Args))
  1024. for _, arg := range c.Args {
  1025. args = append(args, replaceWithReplacer(arg, replacer))
  1026. }
  1027. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1028. defer cancel()
  1029. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1030. cmd.Env = []string{}
  1031. for _, keyVal := range c.EnvVars {
  1032. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1033. }
  1034. startTime := time.Now()
  1035. err := cmd.Run()
  1036. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1037. c.Cmd, time.Since(startTime), err)
  1038. return err
  1039. }
  1040. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1041. addObjectData := false
  1042. if params.Object != nil {
  1043. if strings.Contains(c.Body, "{{ObjectData}}") {
  1044. addObjectData = true
  1045. }
  1046. }
  1047. replacements := params.getStringReplacements(addObjectData)
  1048. replacer := strings.NewReplacer(replacements...)
  1049. body := replaceWithReplacer(c.Body, replacer)
  1050. subject := replaceWithReplacer(c.Subject, replacer)
  1051. startTime := time.Now()
  1052. var files []mail.File
  1053. fileAttachments := make([]string, 0, len(c.Attachments))
  1054. for _, attachment := range c.Attachments {
  1055. if attachment == dataprovider.RetentionReportPlaceHolder {
  1056. f, err := params.getRetentionReportsAsMailAttachment()
  1057. if err != nil {
  1058. return err
  1059. }
  1060. files = append(files, f)
  1061. continue
  1062. }
  1063. fileAttachments = append(fileAttachments, attachment)
  1064. }
  1065. if len(fileAttachments) > 0 {
  1066. user, err := params.getUserFromSender()
  1067. if err != nil {
  1068. return err
  1069. }
  1070. res, err := getMailAttachments(user, fileAttachments, replacer)
  1071. if err != nil {
  1072. return err
  1073. }
  1074. files = append(files, res...)
  1075. }
  1076. err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain, files...)
  1077. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1078. time.Since(startTime), err)
  1079. if err != nil {
  1080. return fmt.Errorf("unable to send email: %w", err)
  1081. }
  1082. return nil
  1083. }
  1084. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1085. err := user.LoadAndApplyGroupSettings()
  1086. if err != nil {
  1087. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1088. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1089. }
  1090. user.UploadDataTransfer = 0
  1091. user.UploadBandwidth = 0
  1092. user.DownloadBandwidth = 0
  1093. user.Filters.DisableFsChecks = false
  1094. user.Filters.FilePatterns = nil
  1095. user.Filters.BandwidthLimits = nil
  1096. user.Filters.DataTransferLimits = nil
  1097. for k := range user.Permissions {
  1098. user.Permissions[k] = []string{dataprovider.PermAny}
  1099. }
  1100. return user, nil
  1101. }
  1102. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1103. for idx := range paths {
  1104. paths[idx] = util.CleanPath(replaceWithReplacer(paths[idx], replacer))
  1105. }
  1106. return util.RemoveDuplicates(paths, false)
  1107. }
  1108. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1109. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1110. if err != nil {
  1111. return err
  1112. }
  1113. return conn.RemoveFile(fs, fsPath, item, info)
  1114. }
  1115. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1116. user, err := getUserForEventAction(user)
  1117. if err != nil {
  1118. return err
  1119. }
  1120. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1121. err = user.CheckFsRoot(connectionID)
  1122. defer user.CloseFs() //nolint:errcheck
  1123. if err != nil {
  1124. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1125. }
  1126. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1127. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1128. info, err := conn.DoStat(item, 0, false)
  1129. if err != nil {
  1130. if conn.IsNotExistError(err) {
  1131. continue
  1132. }
  1133. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1134. }
  1135. if info.IsDir() {
  1136. if err = conn.RemoveDir(item); err != nil {
  1137. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1138. }
  1139. } else {
  1140. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1141. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1142. }
  1143. }
  1144. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1145. }
  1146. return nil
  1147. }
  1148. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1149. conditions dataprovider.ConditionOptions, params *EventParams,
  1150. ) error {
  1151. users, err := params.getUsers()
  1152. if err != nil {
  1153. return fmt.Errorf("unable to get users: %w", err)
  1154. }
  1155. var failures []string
  1156. executed := 0
  1157. for _, user := range users {
  1158. // if sender is set, the conditions have already been evaluated
  1159. if params.sender == "" {
  1160. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1161. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, name conditions don't match",
  1162. user.Username)
  1163. continue
  1164. }
  1165. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1166. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, group name conditions don't match",
  1167. user.Username)
  1168. continue
  1169. }
  1170. }
  1171. executed++
  1172. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1173. params.AddError(err)
  1174. failures = append(failures, user.Username)
  1175. continue
  1176. }
  1177. }
  1178. if len(failures) > 0 {
  1179. return fmt.Errorf("fs delete failed for users: %+v", failures)
  1180. }
  1181. if executed == 0 {
  1182. eventManagerLog(logger.LevelError, "no delete executed")
  1183. return errors.New("no delete executed")
  1184. }
  1185. return nil
  1186. }
  1187. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1188. user, err := getUserForEventAction(user)
  1189. if err != nil {
  1190. return err
  1191. }
  1192. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1193. err = user.CheckFsRoot(connectionID)
  1194. defer user.CloseFs() //nolint:errcheck
  1195. if err != nil {
  1196. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1197. }
  1198. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1199. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1200. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1201. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1202. }
  1203. if err = conn.createDirIfMissing(item); err != nil {
  1204. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1205. }
  1206. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1207. }
  1208. return nil
  1209. }
  1210. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1211. conditions dataprovider.ConditionOptions, params *EventParams,
  1212. ) error {
  1213. users, err := params.getUsers()
  1214. if err != nil {
  1215. return fmt.Errorf("unable to get users: %w", err)
  1216. }
  1217. var failures []string
  1218. executed := 0
  1219. for _, user := range users {
  1220. // if sender is set, the conditions have already been evaluated
  1221. if params.sender == "" {
  1222. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1223. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, name conditions don't match",
  1224. user.Username)
  1225. continue
  1226. }
  1227. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1228. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, group name conditions don't match",
  1229. user.Username)
  1230. continue
  1231. }
  1232. }
  1233. executed++
  1234. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1235. failures = append(failures, user.Username)
  1236. continue
  1237. }
  1238. }
  1239. if len(failures) > 0 {
  1240. return fmt.Errorf("fs mkdir failed for users: %+v", failures)
  1241. }
  1242. if executed == 0 {
  1243. eventManagerLog(logger.LevelError, "no mkdir executed")
  1244. return errors.New("no mkdir executed")
  1245. }
  1246. return nil
  1247. }
  1248. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1249. user dataprovider.User,
  1250. ) error {
  1251. user, err := getUserForEventAction(user)
  1252. if err != nil {
  1253. return err
  1254. }
  1255. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1256. err = user.CheckFsRoot(connectionID)
  1257. defer user.CloseFs() //nolint:errcheck
  1258. if err != nil {
  1259. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1260. }
  1261. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1262. for _, item := range renames {
  1263. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1264. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1265. if err = conn.Rename(source, target); err != nil {
  1266. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1267. }
  1268. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1269. }
  1270. return nil
  1271. }
  1272. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1273. user dataprovider.User,
  1274. ) error {
  1275. user, err := getUserForEventAction(user)
  1276. if err != nil {
  1277. return err
  1278. }
  1279. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1280. err = user.CheckFsRoot(connectionID)
  1281. defer user.CloseFs() //nolint:errcheck
  1282. if err != nil {
  1283. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1284. }
  1285. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1286. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1287. if _, err = conn.DoStat(item, 0, false); err != nil {
  1288. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1289. }
  1290. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1291. }
  1292. return nil
  1293. }
  1294. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1295. conditions dataprovider.ConditionOptions, params *EventParams,
  1296. ) error {
  1297. users, err := params.getUsers()
  1298. if err != nil {
  1299. return fmt.Errorf("unable to get users: %w", err)
  1300. }
  1301. var failures []string
  1302. executed := 0
  1303. for _, user := range users {
  1304. // if sender is set, the conditions have already been evaluated
  1305. if params.sender == "" {
  1306. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1307. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, name conditions don't match",
  1308. user.Username)
  1309. continue
  1310. }
  1311. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1312. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, group name conditions don't match",
  1313. user.Username)
  1314. continue
  1315. }
  1316. }
  1317. executed++
  1318. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1319. failures = append(failures, user.Username)
  1320. params.AddError(err)
  1321. continue
  1322. }
  1323. }
  1324. if len(failures) > 0 {
  1325. return fmt.Errorf("fs rename failed for users: %+v", failures)
  1326. }
  1327. if executed == 0 {
  1328. eventManagerLog(logger.LevelError, "no rename executed")
  1329. return errors.New("no rename executed")
  1330. }
  1331. return nil
  1332. }
  1333. func getArchiveBaseDir(paths []string) string {
  1334. var parentDirs []string
  1335. for _, p := range paths {
  1336. parentDirs = append(parentDirs, path.Dir(p))
  1337. }
  1338. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1339. baseDir := "/"
  1340. if len(parentDirs) == 1 {
  1341. baseDir = parentDirs[0]
  1342. }
  1343. return baseDir
  1344. }
  1345. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1346. user dataprovider.User,
  1347. ) error {
  1348. user, err := getUserForEventAction(user)
  1349. if err != nil {
  1350. return err
  1351. }
  1352. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1353. err = user.CheckFsRoot(connectionID)
  1354. defer user.CloseFs() //nolint:errcheck
  1355. if err != nil {
  1356. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1357. }
  1358. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1359. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1360. paths := make([]string, 0, len(c.Paths))
  1361. for idx := range c.Paths {
  1362. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1363. if p == name {
  1364. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1365. }
  1366. paths = append(paths, p)
  1367. }
  1368. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name)
  1369. if err != nil {
  1370. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1371. return fmt.Errorf("unable to create archive: %w", err)
  1372. }
  1373. defer cancelFn()
  1374. paths = util.RemoveDuplicates(paths, false)
  1375. baseDir := getArchiveBaseDir(paths)
  1376. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1377. zipWriter := &zipWriterWrapper{
  1378. Name: name,
  1379. Writer: zip.NewWriter(writer),
  1380. Entries: make(map[string]bool),
  1381. }
  1382. for _, item := range paths {
  1383. if err := addZipEntry(zipWriter, conn, item, baseDir); err != nil {
  1384. closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err) //nolint:errcheck
  1385. return err
  1386. }
  1387. }
  1388. if err := zipWriter.Writer.Close(); err != nil {
  1389. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1390. closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err) //nolint:errcheck
  1391. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1392. }
  1393. return closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err)
  1394. }
  1395. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1396. params *EventParams,
  1397. ) error {
  1398. users, err := params.getUsers()
  1399. if err != nil {
  1400. return fmt.Errorf("unable to get users: %w", err)
  1401. }
  1402. var failures []string
  1403. executed := 0
  1404. for _, user := range users {
  1405. // if sender is set, the conditions have already been evaluated
  1406. if params.sender == "" {
  1407. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1408. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, name conditions don't match",
  1409. user.Username)
  1410. continue
  1411. }
  1412. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1413. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, group name conditions don't match",
  1414. user.Username)
  1415. continue
  1416. }
  1417. }
  1418. executed++
  1419. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1420. failures = append(failures, user.Username)
  1421. params.AddError(err)
  1422. continue
  1423. }
  1424. }
  1425. if len(failures) > 0 {
  1426. return fmt.Errorf("fs existence check failed for users: %+v", failures)
  1427. }
  1428. if executed == 0 {
  1429. eventManagerLog(logger.LevelError, "no existence check executed")
  1430. return errors.New("no existence check executed")
  1431. }
  1432. return nil
  1433. }
  1434. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1435. conditions dataprovider.ConditionOptions, params *EventParams,
  1436. ) error {
  1437. users, err := params.getUsers()
  1438. if err != nil {
  1439. return fmt.Errorf("unable to get users: %w", err)
  1440. }
  1441. var failures []string
  1442. executed := 0
  1443. for _, user := range users {
  1444. // if sender is set, the conditions have already been evaluated
  1445. if params.sender == "" {
  1446. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1447. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, name conditions don't match",
  1448. user.Username)
  1449. continue
  1450. }
  1451. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1452. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, group name conditions don't match",
  1453. user.Username)
  1454. continue
  1455. }
  1456. }
  1457. executed++
  1458. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1459. failures = append(failures, user.Username)
  1460. params.AddError(err)
  1461. continue
  1462. }
  1463. }
  1464. if len(failures) > 0 {
  1465. return fmt.Errorf("fs compress failed for users: %+v", failures)
  1466. }
  1467. if executed == 0 {
  1468. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1469. return errors.New("no file/folder compressed")
  1470. }
  1471. return nil
  1472. }
  1473. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1474. params *EventParams,
  1475. ) error {
  1476. addObjectData := false
  1477. replacements := params.getStringReplacements(addObjectData)
  1478. replacer := strings.NewReplacer(replacements...)
  1479. switch c.Type {
  1480. case dataprovider.FilesystemActionRename:
  1481. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1482. case dataprovider.FilesystemActionDelete:
  1483. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1484. case dataprovider.FilesystemActionMkdirs:
  1485. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1486. case dataprovider.FilesystemActionExist:
  1487. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1488. case dataprovider.FilesystemActionCompress:
  1489. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1490. default:
  1491. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1492. }
  1493. }
  1494. func executeQuotaResetForUser(user dataprovider.User) error {
  1495. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1496. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1497. user.Username, err)
  1498. return err
  1499. }
  1500. if !QuotaScans.AddUserQuotaScan(user.Username) {
  1501. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1502. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1503. }
  1504. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1505. numFiles, size, err := user.ScanQuota()
  1506. if err != nil {
  1507. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1508. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1509. }
  1510. err = dataprovider.UpdateUserQuota(&user, numFiles, size, true)
  1511. if err != nil {
  1512. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1513. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1514. }
  1515. return nil
  1516. }
  1517. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1518. users, err := params.getUsers()
  1519. if err != nil {
  1520. return fmt.Errorf("unable to get users: %w", err)
  1521. }
  1522. var failedResets []string
  1523. executed := 0
  1524. for _, user := range users {
  1525. // if sender is set, the conditions have already been evaluated
  1526. if params.sender == "" {
  1527. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1528. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, name conditions don't match",
  1529. user.Username)
  1530. continue
  1531. }
  1532. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1533. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, group name conditions don't match",
  1534. user.Username)
  1535. continue
  1536. }
  1537. }
  1538. executed++
  1539. if err = executeQuotaResetForUser(user); err != nil {
  1540. params.AddError(err)
  1541. failedResets = append(failedResets, user.Username)
  1542. continue
  1543. }
  1544. }
  1545. if len(failedResets) > 0 {
  1546. return fmt.Errorf("quota reset failed for users: %+v", failedResets)
  1547. }
  1548. if executed == 0 {
  1549. eventManagerLog(logger.LevelError, "no user quota reset executed")
  1550. return errors.New("no user quota reset executed")
  1551. }
  1552. return nil
  1553. }
  1554. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1555. folders, err := params.getFolders()
  1556. if err != nil {
  1557. return fmt.Errorf("unable to get folders: %w", err)
  1558. }
  1559. var failedResets []string
  1560. executed := 0
  1561. for _, folder := range folders {
  1562. // if sender is set, the conditions have already been evaluated
  1563. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  1564. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  1565. folder.Name)
  1566. continue
  1567. }
  1568. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  1569. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  1570. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  1571. failedResets = append(failedResets, folder.Name)
  1572. continue
  1573. }
  1574. executed++
  1575. f := vfs.VirtualFolder{
  1576. BaseVirtualFolder: folder,
  1577. VirtualPath: "/",
  1578. }
  1579. numFiles, size, err := f.ScanQuota()
  1580. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  1581. if err != nil {
  1582. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  1583. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  1584. failedResets = append(failedResets, folder.Name)
  1585. continue
  1586. }
  1587. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  1588. if err != nil {
  1589. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  1590. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  1591. failedResets = append(failedResets, folder.Name)
  1592. }
  1593. }
  1594. if len(failedResets) > 0 {
  1595. return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
  1596. }
  1597. if executed == 0 {
  1598. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  1599. return errors.New("no folder quota reset executed")
  1600. }
  1601. return nil
  1602. }
  1603. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1604. users, err := params.getUsers()
  1605. if err != nil {
  1606. return fmt.Errorf("unable to get users: %w", err)
  1607. }
  1608. var failedResets []string
  1609. executed := 0
  1610. for _, user := range users {
  1611. // if sender is set, the conditions have already been evaluated
  1612. if params.sender == "" {
  1613. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1614. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, name conditions don't match",
  1615. user.Username)
  1616. continue
  1617. }
  1618. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1619. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, group name conditions don't match",
  1620. user.Username)
  1621. continue
  1622. }
  1623. }
  1624. executed++
  1625. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  1626. if err != nil {
  1627. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  1628. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  1629. failedResets = append(failedResets, user.Username)
  1630. }
  1631. }
  1632. if len(failedResets) > 0 {
  1633. return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
  1634. }
  1635. if executed == 0 {
  1636. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  1637. return errors.New("no transfer quota reset executed")
  1638. }
  1639. return nil
  1640. }
  1641. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  1642. params *EventParams, actionName string,
  1643. ) error {
  1644. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1645. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  1646. user.Username, err)
  1647. return err
  1648. }
  1649. check := RetentionCheck{
  1650. Folders: folders,
  1651. }
  1652. c := RetentionChecks.Add(check, &user)
  1653. if c == nil {
  1654. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  1655. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  1656. }
  1657. defer func() {
  1658. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  1659. Username: user.Username,
  1660. ActionName: actionName,
  1661. Results: c.results,
  1662. })
  1663. }()
  1664. if err := c.Start(); err != nil {
  1665. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  1666. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  1667. }
  1668. return nil
  1669. }
  1670. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  1671. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  1672. ) error {
  1673. users, err := params.getUsers()
  1674. if err != nil {
  1675. return fmt.Errorf("unable to get users: %w", err)
  1676. }
  1677. var failedChecks []string
  1678. executed := 0
  1679. for _, user := range users {
  1680. // if sender is set, the conditions have already been evaluated
  1681. if params.sender == "" {
  1682. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1683. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, name conditions don't match",
  1684. user.Username)
  1685. continue
  1686. }
  1687. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1688. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, group name conditions don't match",
  1689. user.Username)
  1690. continue
  1691. }
  1692. }
  1693. executed++
  1694. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  1695. failedChecks = append(failedChecks, user.Username)
  1696. params.AddError(err)
  1697. continue
  1698. }
  1699. }
  1700. if len(failedChecks) > 0 {
  1701. return fmt.Errorf("retention check failed for users: %+v", failedChecks)
  1702. }
  1703. if executed == 0 {
  1704. eventManagerLog(logger.LevelError, "no retention check executed")
  1705. return errors.New("no retention check executed")
  1706. }
  1707. return nil
  1708. }
  1709. func executeMetadataCheckForUser(user dataprovider.User) error {
  1710. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1711. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1712. user.Username, err)
  1713. return err
  1714. }
  1715. if !ActiveMetadataChecks.Add(user.Username) {
  1716. eventManagerLog(logger.LevelError, "another metadata check is already in progress for user %q", user.Username)
  1717. return fmt.Errorf("another metadata check is in progress for user %q", user.Username)
  1718. }
  1719. defer ActiveMetadataChecks.Remove(user.Username)
  1720. if err := user.CheckMetadataConsistency(); err != nil {
  1721. eventManagerLog(logger.LevelError, "error checking metadata consistence for user %q: %v", user.Username, err)
  1722. return fmt.Errorf("error checking metadata consistence for user %q: %w", user.Username, err)
  1723. }
  1724. return nil
  1725. }
  1726. func executeMetadataCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1727. users, err := params.getUsers()
  1728. if err != nil {
  1729. return fmt.Errorf("unable to get users: %w", err)
  1730. }
  1731. var failures []string
  1732. var executed int
  1733. for _, user := range users {
  1734. // if sender is set, the conditions have already been evaluated
  1735. if params.sender == "" {
  1736. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1737. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, name conditions don't match",
  1738. user.Username)
  1739. continue
  1740. }
  1741. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1742. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, group name conditions don't match",
  1743. user.Username)
  1744. continue
  1745. }
  1746. }
  1747. executed++
  1748. if err = executeMetadataCheckForUser(user); err != nil {
  1749. params.AddError(err)
  1750. failures = append(failures, user.Username)
  1751. continue
  1752. }
  1753. }
  1754. if len(failures) > 0 {
  1755. return fmt.Errorf("metadata check failed for users: %+v", failures)
  1756. }
  1757. if executed == 0 {
  1758. eventManagerLog(logger.LevelError, "no metadata check executed")
  1759. return errors.New("no metadata check executed")
  1760. }
  1761. return nil
  1762. }
  1763. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams,
  1764. conditions dataprovider.ConditionOptions,
  1765. ) error {
  1766. var err error
  1767. switch action.Type {
  1768. case dataprovider.ActionTypeHTTP:
  1769. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  1770. case dataprovider.ActionTypeCommand:
  1771. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  1772. case dataprovider.ActionTypeEmail:
  1773. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  1774. case dataprovider.ActionTypeBackup:
  1775. err = dataprovider.ExecuteBackup()
  1776. case dataprovider.ActionTypeUserQuotaReset:
  1777. err = executeUsersQuotaResetRuleAction(conditions, params)
  1778. case dataprovider.ActionTypeFolderQuotaReset:
  1779. err = executeFoldersQuotaResetRuleAction(conditions, params)
  1780. case dataprovider.ActionTypeTransferQuotaReset:
  1781. err = executeTransferQuotaResetRuleAction(conditions, params)
  1782. case dataprovider.ActionTypeDataRetentionCheck:
  1783. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  1784. case dataprovider.ActionTypeMetadataCheck:
  1785. err = executeMetadataCheckRuleAction(conditions, params)
  1786. case dataprovider.ActionTypeFilesystem:
  1787. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  1788. default:
  1789. err = fmt.Errorf("unsupported action type: %d", action.Type)
  1790. }
  1791. if err != nil {
  1792. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  1793. }
  1794. params.AddError(err)
  1795. return err
  1796. }
  1797. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  1798. var errRes error
  1799. for _, rule := range rules {
  1800. var failedActions []string
  1801. paramsCopy := params.getACopy()
  1802. for _, action := range rule.Actions {
  1803. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  1804. startTime := time.Now()
  1805. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  1806. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  1807. action.Name, rule.Name, time.Since(startTime), err)
  1808. failedActions = append(failedActions, action.Name)
  1809. // we return the last error, it is ok for now
  1810. errRes = err
  1811. if action.Options.StopOnFailure {
  1812. break
  1813. }
  1814. } else {
  1815. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  1816. action.Name, rule.Name, time.Since(startTime))
  1817. }
  1818. }
  1819. }
  1820. // execute async actions if any, including failure actions
  1821. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  1822. }
  1823. return errRes
  1824. }
  1825. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  1826. eventManager.addAsyncTask()
  1827. defer eventManager.removeAsyncTask()
  1828. for _, rule := range rules {
  1829. executeRuleAsyncActions(rule, params.getACopy(), nil)
  1830. }
  1831. }
  1832. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  1833. for _, action := range rule.Actions {
  1834. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  1835. startTime := time.Now()
  1836. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  1837. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  1838. action.Name, rule.Name, time.Since(startTime), err)
  1839. failedActions = append(failedActions, action.Name)
  1840. if action.Options.StopOnFailure {
  1841. break
  1842. }
  1843. } else {
  1844. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  1845. action.Name, rule.Name, time.Since(startTime))
  1846. }
  1847. }
  1848. }
  1849. if len(failedActions) > 0 {
  1850. params.updateStatusFromError = false
  1851. // execute failure actions
  1852. for _, action := range rule.Actions {
  1853. if action.Options.IsFailureAction {
  1854. startTime := time.Now()
  1855. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  1856. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  1857. action.Name, rule.Name, time.Since(startTime), err)
  1858. if action.Options.StopOnFailure {
  1859. break
  1860. }
  1861. } else {
  1862. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  1863. action.Name, rule.Name, time.Since(startTime))
  1864. }
  1865. }
  1866. }
  1867. }
  1868. }
  1869. type eventCronJob struct {
  1870. ruleName string
  1871. }
  1872. func (j *eventCronJob) getTask(rule dataprovider.EventRule) (dataprovider.Task, error) {
  1873. if rule.GuardFromConcurrentExecution() {
  1874. task, err := dataprovider.GetTaskByName(rule.Name)
  1875. if _, ok := err.(*util.RecordNotFoundError); ok {
  1876. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  1877. task = dataprovider.Task{
  1878. Name: rule.Name,
  1879. UpdateAt: 0,
  1880. Version: 0,
  1881. }
  1882. err = dataprovider.AddTask(rule.Name)
  1883. if err != nil {
  1884. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  1885. return task, err
  1886. }
  1887. } else {
  1888. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  1889. }
  1890. return task, err
  1891. }
  1892. return dataprovider.Task{}, nil
  1893. }
  1894. func (j *eventCronJob) Run() {
  1895. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  1896. rule, err := dataprovider.EventRuleExists(j.ruleName)
  1897. if err != nil {
  1898. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  1899. return
  1900. }
  1901. if err = rule.CheckActionsConsistency(""); err != nil {
  1902. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  1903. return
  1904. }
  1905. task, err := j.getTask(rule)
  1906. if err != nil {
  1907. return
  1908. }
  1909. if task.Name != "" {
  1910. updateInterval := 5 * time.Minute
  1911. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  1912. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  1913. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  1914. return
  1915. }
  1916. err = dataprovider.UpdateTask(rule.Name, task.Version)
  1917. if err != nil {
  1918. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  1919. rule.Name, err)
  1920. return
  1921. }
  1922. ticker := time.NewTicker(updateInterval)
  1923. done := make(chan bool)
  1924. defer func() {
  1925. done <- true
  1926. ticker.Stop()
  1927. }()
  1928. go func(taskName string) {
  1929. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  1930. for {
  1931. select {
  1932. case <-done:
  1933. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  1934. return
  1935. case <-ticker.C:
  1936. err := dataprovider.UpdateTaskTimestamp(taskName)
  1937. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  1938. }
  1939. }
  1940. }(task.Name)
  1941. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  1942. } else {
  1943. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  1944. }
  1945. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  1946. }
  1947. type zipWriterWrapper struct {
  1948. Name string
  1949. Entries map[string]bool
  1950. Writer *zip.Writer
  1951. }
  1952. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  1953. logger.Log(level, "eventmanager", "", format, v...)
  1954. }