eventmanager.go 68 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "errors"
  20. "fmt"
  21. "io"
  22. "mime"
  23. "mime/multipart"
  24. "net/http"
  25. "net/textproto"
  26. "net/url"
  27. "os"
  28. "os/exec"
  29. "path"
  30. "path/filepath"
  31. "strconv"
  32. "strings"
  33. "sync"
  34. "sync/atomic"
  35. "time"
  36. "github.com/klauspost/compress/zip"
  37. "github.com/robfig/cron/v3"
  38. "github.com/rs/xid"
  39. "github.com/sftpgo/sdk"
  40. mail "github.com/xhit/go-simple-mail/v2"
  41. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  42. "github.com/drakkan/sftpgo/v2/internal/logger"
  43. "github.com/drakkan/sftpgo/v2/internal/plugin"
  44. "github.com/drakkan/sftpgo/v2/internal/smtp"
  45. "github.com/drakkan/sftpgo/v2/internal/util"
  46. "github.com/drakkan/sftpgo/v2/internal/vfs"
  47. )
  48. const (
  49. ipBlockedEventName = "IP Blocked"
  50. maxAttachmentsSize = int64(10 * 1024 * 1024)
  51. )
  52. var (
  53. // eventManager handle the supported event rules actions
  54. eventManager eventRulesContainer
  55. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  56. )
  57. func init() {
  58. eventManager = eventRulesContainer{
  59. schedulesMapping: make(map[string][]cron.EntryID),
  60. // arbitrary maximum number of concurrent asynchronous tasks,
  61. // each task could execute multiple actions
  62. concurrencyGuard: make(chan struct{}, 200),
  63. }
  64. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  65. func(operation, executor, ip, objectType, objectName string, object plugin.Renderer) {
  66. eventManager.handleProviderEvent(EventParams{
  67. Name: executor,
  68. ObjectName: objectName,
  69. Event: operation,
  70. Status: 1,
  71. ObjectType: objectType,
  72. IP: ip,
  73. Timestamp: time.Now().UnixNano(),
  74. Object: object,
  75. })
  76. })
  77. }
  78. // HandleCertificateEvent checks and executes action rules for certificate events
  79. func HandleCertificateEvent(params EventParams) {
  80. eventManager.handleCertificateEvent(params)
  81. }
  82. // eventRulesContainer stores event rules by trigger
  83. type eventRulesContainer struct {
  84. sync.RWMutex
  85. lastLoad atomic.Int64
  86. FsEvents []dataprovider.EventRule
  87. ProviderEvents []dataprovider.EventRule
  88. Schedules []dataprovider.EventRule
  89. IPBlockedEvents []dataprovider.EventRule
  90. CertificateEvents []dataprovider.EventRule
  91. schedulesMapping map[string][]cron.EntryID
  92. concurrencyGuard chan struct{}
  93. }
  94. func (r *eventRulesContainer) addAsyncTask() {
  95. activeHooks.Add(1)
  96. r.concurrencyGuard <- struct{}{}
  97. }
  98. func (r *eventRulesContainer) removeAsyncTask() {
  99. activeHooks.Add(-1)
  100. <-r.concurrencyGuard
  101. }
  102. func (r *eventRulesContainer) getLastLoadTime() int64 {
  103. return r.lastLoad.Load()
  104. }
  105. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  106. r.lastLoad.Store(modTime)
  107. }
  108. // RemoveRule deletes the rule with the specified name
  109. func (r *eventRulesContainer) RemoveRule(name string) {
  110. r.Lock()
  111. defer r.Unlock()
  112. r.removeRuleInternal(name)
  113. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  114. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  115. }
  116. func (r *eventRulesContainer) removeRuleInternal(name string) {
  117. for idx := range r.FsEvents {
  118. if r.FsEvents[idx].Name == name {
  119. lastIdx := len(r.FsEvents) - 1
  120. r.FsEvents[idx] = r.FsEvents[lastIdx]
  121. r.FsEvents = r.FsEvents[:lastIdx]
  122. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  123. return
  124. }
  125. }
  126. for idx := range r.ProviderEvents {
  127. if r.ProviderEvents[idx].Name == name {
  128. lastIdx := len(r.ProviderEvents) - 1
  129. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  130. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  131. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  132. return
  133. }
  134. }
  135. for idx := range r.IPBlockedEvents {
  136. if r.IPBlockedEvents[idx].Name == name {
  137. lastIdx := len(r.IPBlockedEvents) - 1
  138. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  139. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  140. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  141. return
  142. }
  143. }
  144. for idx := range r.CertificateEvents {
  145. if r.CertificateEvents[idx].Name == name {
  146. lastIdx := len(r.CertificateEvents) - 1
  147. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  148. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  149. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  150. return
  151. }
  152. }
  153. for idx := range r.Schedules {
  154. if r.Schedules[idx].Name == name {
  155. if schedules, ok := r.schedulesMapping[name]; ok {
  156. for _, entryID := range schedules {
  157. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  158. eventScheduler.Remove(entryID)
  159. }
  160. delete(r.schedulesMapping, name)
  161. }
  162. lastIdx := len(r.Schedules) - 1
  163. r.Schedules[idx] = r.Schedules[lastIdx]
  164. r.Schedules = r.Schedules[:lastIdx]
  165. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  166. return
  167. }
  168. }
  169. }
  170. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  171. r.removeRuleInternal(rule.Name)
  172. if rule.DeletedAt > 0 {
  173. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  174. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  175. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  176. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  177. }
  178. return
  179. }
  180. switch rule.Trigger {
  181. case dataprovider.EventTriggerFsEvent:
  182. r.FsEvents = append(r.FsEvents, rule)
  183. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  184. case dataprovider.EventTriggerProviderEvent:
  185. r.ProviderEvents = append(r.ProviderEvents, rule)
  186. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  187. case dataprovider.EventTriggerIPBlocked:
  188. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  189. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  190. case dataprovider.EventTriggerCertificate:
  191. r.CertificateEvents = append(r.CertificateEvents, rule)
  192. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  193. case dataprovider.EventTriggerSchedule:
  194. for _, schedule := range rule.Conditions.Schedules {
  195. cronSpec := schedule.GetCronSpec()
  196. job := &eventCronJob{
  197. ruleName: dataprovider.ConvertName(rule.Name),
  198. }
  199. entryID, err := eventScheduler.AddJob(cronSpec, job)
  200. if err != nil {
  201. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  202. return
  203. }
  204. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  205. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  206. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  207. }
  208. r.Schedules = append(r.Schedules, rule)
  209. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  210. default:
  211. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  212. }
  213. }
  214. func (r *eventRulesContainer) loadRules() {
  215. eventManagerLog(logger.LevelDebug, "loading updated rules")
  216. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  217. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  218. if err != nil {
  219. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  220. return
  221. }
  222. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  223. if len(rules) > 0 {
  224. r.Lock()
  225. defer r.Unlock()
  226. for _, rule := range rules {
  227. r.addUpdateRuleInternal(rule)
  228. }
  229. }
  230. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d",
  231. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents))
  232. r.setLastLoadTime(modTime)
  233. }
  234. func (r *eventRulesContainer) checkProviderEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  235. if !util.Contains(conditions.ProviderEvents, params.Event) {
  236. return false
  237. }
  238. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  239. return false
  240. }
  241. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  242. return false
  243. }
  244. return true
  245. }
  246. func (r *eventRulesContainer) checkFsEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  247. if !util.Contains(conditions.FsEvents, params.Event) {
  248. return false
  249. }
  250. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  251. return false
  252. }
  253. if !checkEventGroupConditionPatters(params.Groups, conditions.Options.GroupNames) {
  254. return false
  255. }
  256. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  257. if !checkEventConditionPatterns(params.ObjectName, conditions.Options.FsPaths) {
  258. return false
  259. }
  260. }
  261. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  262. return false
  263. }
  264. if params.Event == operationUpload || params.Event == operationDownload {
  265. if conditions.Options.MinFileSize > 0 {
  266. if params.FileSize < conditions.Options.MinFileSize {
  267. return false
  268. }
  269. }
  270. if conditions.Options.MaxFileSize > 0 {
  271. if params.FileSize > conditions.Options.MaxFileSize {
  272. return false
  273. }
  274. }
  275. }
  276. return true
  277. }
  278. // hasFsRules returns true if there are any rules for filesystem event triggers
  279. func (r *eventRulesContainer) hasFsRules() bool {
  280. r.RLock()
  281. defer r.RUnlock()
  282. return len(r.FsEvents) > 0
  283. }
  284. // handleFsEvent executes the rules actions defined for the specified event
  285. func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
  286. if params.Protocol == protocolEventAction {
  287. return nil
  288. }
  289. r.RLock()
  290. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  291. for _, rule := range r.FsEvents {
  292. if r.checkFsEventMatch(rule.Conditions, params) {
  293. if err := rule.CheckActionsConsistency(""); err != nil {
  294. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  295. rule.Name, err, params.Event)
  296. continue
  297. }
  298. hasSyncActions := false
  299. for _, action := range rule.Actions {
  300. if action.Options.ExecuteSync {
  301. hasSyncActions = true
  302. break
  303. }
  304. }
  305. if hasSyncActions {
  306. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  307. } else {
  308. rulesAsync = append(rulesAsync, rule)
  309. }
  310. }
  311. }
  312. r.RUnlock()
  313. params.sender = params.Name
  314. if len(rulesAsync) > 0 {
  315. go executeAsyncRulesActions(rulesAsync, params)
  316. }
  317. if len(rulesWithSyncActions) > 0 {
  318. return executeSyncRulesActions(rulesWithSyncActions, params)
  319. }
  320. return nil
  321. }
  322. // username is populated for user objects
  323. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  324. r.RLock()
  325. defer r.RUnlock()
  326. var rules []dataprovider.EventRule
  327. for _, rule := range r.ProviderEvents {
  328. if r.checkProviderEventMatch(rule.Conditions, params) {
  329. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  330. rules = append(rules, rule)
  331. } else {
  332. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  333. rule.Name, err, params.Event, params.ObjectType)
  334. }
  335. }
  336. }
  337. if len(rules) > 0 {
  338. params.sender = params.ObjectName
  339. go executeAsyncRulesActions(rules, params)
  340. }
  341. }
  342. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  343. r.RLock()
  344. defer r.RUnlock()
  345. if len(r.IPBlockedEvents) == 0 {
  346. return
  347. }
  348. var rules []dataprovider.EventRule
  349. for _, rule := range r.IPBlockedEvents {
  350. if err := rule.CheckActionsConsistency(""); err == nil {
  351. rules = append(rules, rule)
  352. } else {
  353. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  354. rule.Name, err, params.Event)
  355. }
  356. }
  357. if len(rules) > 0 {
  358. go executeAsyncRulesActions(rules, params)
  359. }
  360. }
  361. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  362. r.RLock()
  363. defer r.RUnlock()
  364. if len(r.CertificateEvents) == 0 {
  365. return
  366. }
  367. var rules []dataprovider.EventRule
  368. for _, rule := range r.CertificateEvents {
  369. if err := rule.CheckActionsConsistency(""); err == nil {
  370. rules = append(rules, rule)
  371. } else {
  372. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  373. rule.Name, err, params.Event)
  374. }
  375. }
  376. if len(rules) > 0 {
  377. go executeAsyncRulesActions(rules, params)
  378. }
  379. }
  380. type executedRetentionCheck struct {
  381. Username string
  382. ActionName string
  383. Results []folderRetentionCheckResult
  384. }
  385. // EventParams defines the supported event parameters
  386. type EventParams struct {
  387. Name string
  388. Groups []sdk.GroupMapping
  389. Event string
  390. Status int
  391. VirtualPath string
  392. FsPath string
  393. VirtualTargetPath string
  394. FsTargetPath string
  395. ObjectName string
  396. ObjectType string
  397. FileSize int64
  398. Protocol string
  399. IP string
  400. Timestamp int64
  401. Object plugin.Renderer
  402. sender string
  403. updateStatusFromError bool
  404. errors []string
  405. retentionChecks []executedRetentionCheck
  406. }
  407. func (p *EventParams) getACopy() *EventParams {
  408. params := *p
  409. params.errors = make([]string, len(p.errors))
  410. copy(params.errors, p.errors)
  411. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  412. for _, c := range p.retentionChecks {
  413. executedCheck := executedRetentionCheck{
  414. Username: c.Username,
  415. ActionName: c.ActionName,
  416. }
  417. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  418. copy(executedCheck.Results, c.Results)
  419. retentionChecks = append(retentionChecks, executedCheck)
  420. }
  421. params.retentionChecks = retentionChecks
  422. return &params
  423. }
  424. // AddError adds a new error to the event params and update the status if needed
  425. func (p *EventParams) AddError(err error) {
  426. if err == nil {
  427. return
  428. }
  429. if p.updateStatusFromError && p.Status == 1 {
  430. p.Status = 2
  431. }
  432. p.errors = append(p.errors, err.Error())
  433. }
  434. func (p *EventParams) setBackupParams(backupPath string) {
  435. if p.sender != "" {
  436. return
  437. }
  438. p.sender = dataprovider.ActionExecutorSystem
  439. p.FsPath = backupPath
  440. p.ObjectName = filepath.Base(backupPath)
  441. p.VirtualPath = "/" + p.ObjectName
  442. p.Timestamp = time.Now().UnixNano()
  443. info, err := os.Stat(backupPath)
  444. if err == nil {
  445. p.FileSize = info.Size()
  446. }
  447. }
  448. func (p *EventParams) getStatusString() string {
  449. switch p.Status {
  450. case 1:
  451. return "OK"
  452. default:
  453. return "KO"
  454. }
  455. }
  456. // getUsers returns users with group settings not applied
  457. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  458. if p.sender == "" {
  459. users, err := dataprovider.DumpUsers()
  460. if err != nil {
  461. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  462. return users, errors.New("unable to get users")
  463. }
  464. return users, nil
  465. }
  466. user, err := p.getUserFromSender()
  467. if err != nil {
  468. return nil, err
  469. }
  470. return []dataprovider.User{user}, nil
  471. }
  472. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  473. if p.sender == dataprovider.ActionExecutorSystem {
  474. return dataprovider.User{
  475. BaseUser: sdk.BaseUser{
  476. Status: 1,
  477. Username: p.sender,
  478. HomeDir: dataprovider.GetBackupsPath(),
  479. Permissions: map[string][]string{
  480. "/": {dataprovider.PermAny},
  481. },
  482. },
  483. }, nil
  484. }
  485. user, err := dataprovider.UserExists(p.sender)
  486. if err != nil {
  487. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  488. return user, fmt.Errorf("error getting user %q", p.sender)
  489. }
  490. return user, nil
  491. }
  492. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  493. if p.sender == "" {
  494. return dataprovider.DumpFolders()
  495. }
  496. folder, err := dataprovider.GetFolderByName(p.sender)
  497. if err != nil {
  498. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  499. }
  500. return []vfs.BaseVirtualFolder{folder}, nil
  501. }
  502. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  503. if len(p.retentionChecks) == 0 {
  504. return nil, errors.New("no data retention report available")
  505. }
  506. var b bytes.Buffer
  507. wr := zip.NewWriter(&b)
  508. for _, check := range p.retentionChecks {
  509. if size := int64(len(b.Bytes())); size > maxAttachmentsSize {
  510. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s", util.ByteCountIEC(size))
  511. return nil, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(size))
  512. }
  513. data, err := getCSVRetentionReport(check.Results)
  514. if err != nil {
  515. return nil, fmt.Errorf("unable to get CSV report: %w", err)
  516. }
  517. fh := &zip.FileHeader{
  518. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  519. Method: zip.Deflate,
  520. Modified: time.Now().UTC(),
  521. }
  522. f, err := wr.CreateHeader(fh)
  523. if err != nil {
  524. return nil, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  525. }
  526. _, err = io.Copy(f, bytes.NewBuffer(data))
  527. if err != nil {
  528. return nil, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  529. }
  530. }
  531. if err := wr.Close(); err != nil {
  532. return nil, fmt.Errorf("unable to close zip writer: %w", err)
  533. }
  534. return b.Bytes(), nil
  535. }
  536. func (p *EventParams) getRetentionReportsAsMailAttachment() (mail.File, error) {
  537. var result mail.File
  538. data, err := p.getCompressedDataRetentionReport()
  539. if err != nil {
  540. return result, err
  541. }
  542. result.Name = "retention-reports.zip"
  543. result.Data = data
  544. return result, nil
  545. }
  546. func (p *EventParams) getStringReplacements(addObjectData bool) []string {
  547. replacements := []string{
  548. "{{Name}}", p.Name,
  549. "{{Event}}", p.Event,
  550. "{{Status}}", fmt.Sprintf("%d", p.Status),
  551. "{{VirtualPath}}", p.VirtualPath,
  552. "{{FsPath}}", p.FsPath,
  553. "{{VirtualTargetPath}}", p.VirtualTargetPath,
  554. "{{FsTargetPath}}", p.FsTargetPath,
  555. "{{ObjectName}}", p.ObjectName,
  556. "{{ObjectType}}", p.ObjectType,
  557. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  558. "{{Protocol}}", p.Protocol,
  559. "{{IP}}", p.IP,
  560. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  561. "{{StatusString}}", p.getStatusString(),
  562. }
  563. if p.VirtualPath != "" {
  564. replacements = append(replacements, "{{VirtualDirPath}}", path.Dir(p.VirtualPath))
  565. }
  566. if p.VirtualTargetPath != "" {
  567. replacements = append(replacements, "{{VirtualTargetDirPath}}", path.Dir(p.VirtualTargetPath))
  568. replacements = append(replacements, "{{TargetName}}", path.Base(p.VirtualTargetPath))
  569. }
  570. if len(p.errors) > 0 {
  571. replacements = append(replacements, "{{ErrorString}}", strings.Join(p.errors, ", "))
  572. } else {
  573. replacements = append(replacements, "{{ErrorString}}", "")
  574. }
  575. replacements = append(replacements, "{{ObjectData}}", "")
  576. if addObjectData {
  577. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  578. if err == nil {
  579. replacements[len(replacements)-1] = string(data)
  580. }
  581. }
  582. return replacements
  583. }
  584. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  585. var b bytes.Buffer
  586. csvWriter := csv.NewWriter(&b)
  587. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  588. "elapsed (ms)", "info", "error"})
  589. if err != nil {
  590. return nil, err
  591. }
  592. for _, result := range results {
  593. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  594. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  595. result.Info, result.Error})
  596. if err != nil {
  597. return nil, err
  598. }
  599. }
  600. csvWriter.Flush()
  601. err = csvWriter.Error()
  602. return b.Bytes(), err
  603. }
  604. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualPath string, numFiles int,
  605. truncatedSize int64, errTransfer error,
  606. ) error {
  607. errWrite := w.Close()
  608. info, err := conn.doStatInternal(virtualPath, 0, false, false)
  609. if err == nil {
  610. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, info.Size()-truncatedSize)
  611. _, fsPath, errFs := conn.GetFsAndResolvedPath(virtualPath)
  612. if errFs == nil {
  613. if errTransfer == nil {
  614. errTransfer = errWrite
  615. }
  616. ExecuteActionNotification(conn, operationUpload, fsPath, virtualPath, "", "", "", info.Size(), errTransfer) //nolint:errcheck
  617. }
  618. } else {
  619. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", virtualPath, err)
  620. }
  621. return errWrite
  622. }
  623. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  624. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  625. if err != nil {
  626. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  627. return
  628. }
  629. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  630. if vfolder.IsIncludedInUserQuota() {
  631. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  632. }
  633. }
  634. func getFileWriter(conn *BaseConnection, virtualPath string) (io.WriteCloser, int, int64, func(), error) {
  635. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  636. if err != nil {
  637. return nil, 0, 0, nil, err
  638. }
  639. var truncatedSize, fileSize int64
  640. numFiles := 1
  641. isFileOverwrite := false
  642. info, err := fs.Lstat(fsPath)
  643. if err == nil {
  644. fileSize = info.Size()
  645. if info.IsDir() {
  646. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  647. }
  648. if info.Mode().IsRegular() {
  649. isFileOverwrite = true
  650. truncatedSize = fileSize
  651. }
  652. numFiles = 0
  653. }
  654. if err != nil && !fs.IsNotExist(err) {
  655. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  656. }
  657. f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1))
  658. if err != nil {
  659. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  660. }
  661. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  662. if isFileOverwrite {
  663. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  664. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  665. truncatedSize = 0
  666. }
  667. }
  668. if cancelFn == nil {
  669. cancelFn = func() {}
  670. }
  671. if f != nil {
  672. return f, numFiles, truncatedSize, cancelFn, nil
  673. }
  674. return w, numFiles, truncatedSize, cancelFn, nil
  675. }
  676. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string) error {
  677. if entryPath == wr.Name {
  678. // skip the archive itself
  679. return nil
  680. }
  681. info, err := conn.DoStat(entryPath, 1, false)
  682. if err != nil {
  683. eventManagerLog(logger.LevelError, "unable to add zip entry %#v, stat error: %v", entryPath, err)
  684. return err
  685. }
  686. entryName, err := getZipEntryName(entryPath, baseDir)
  687. if err != nil {
  688. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  689. return err
  690. }
  691. if _, ok := wr.Entries[entryName]; ok {
  692. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  693. return nil
  694. }
  695. wr.Entries[entryName] = true
  696. if info.IsDir() {
  697. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  698. Name: entryName + "/",
  699. Method: zip.Deflate,
  700. Modified: info.ModTime(),
  701. })
  702. if err != nil {
  703. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  704. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  705. }
  706. contents, err := conn.ListDir(entryPath)
  707. if err != nil {
  708. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  709. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  710. }
  711. for _, info := range contents {
  712. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  713. if err := addZipEntry(wr, conn, fullPath, baseDir); err != nil {
  714. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  715. return err
  716. }
  717. }
  718. return nil
  719. }
  720. if !info.Mode().IsRegular() {
  721. // we only allow regular files
  722. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  723. return nil
  724. }
  725. reader, cancelFn, err := getFileReader(conn, entryPath)
  726. if err != nil {
  727. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  728. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  729. }
  730. defer cancelFn()
  731. defer reader.Close()
  732. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  733. Name: entryName,
  734. Method: zip.Deflate,
  735. Modified: info.ModTime(),
  736. })
  737. if err != nil {
  738. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  739. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  740. }
  741. _, err = io.Copy(f, reader)
  742. return err
  743. }
  744. func getZipEntryName(entryPath, baseDir string) (string, error) {
  745. if !strings.HasPrefix(entryPath, baseDir) {
  746. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  747. }
  748. entryPath = strings.TrimPrefix(entryPath, baseDir)
  749. return strings.TrimPrefix(entryPath, "/"), nil
  750. }
  751. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  752. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  753. if err != nil {
  754. return nil, nil, err
  755. }
  756. f, r, cancelFn, err := fs.Open(fsPath, 0)
  757. if err != nil {
  758. return nil, nil, conn.GetFsError(fs, err)
  759. }
  760. if cancelFn == nil {
  761. cancelFn = func() {}
  762. }
  763. if f != nil {
  764. return f, cancelFn, nil
  765. }
  766. return r, cancelFn, nil
  767. }
  768. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  769. reader, cancelFn, err := getFileReader(conn, virtualPath)
  770. if err != nil {
  771. return err
  772. }
  773. defer cancelFn()
  774. defer reader.Close()
  775. _, err = io.Copy(w, reader)
  776. return err
  777. }
  778. func getFileContent(conn *BaseConnection, virtualPath string, expectedSize int) ([]byte, error) {
  779. reader, cancelFn, err := getFileReader(conn, virtualPath)
  780. if err != nil {
  781. return nil, err
  782. }
  783. defer cancelFn()
  784. defer reader.Close()
  785. data := make([]byte, expectedSize)
  786. _, err = io.ReadFull(reader, data)
  787. return data, err
  788. }
  789. func getMailAttachments(user dataprovider.User, attachments []string, replacer *strings.Replacer) ([]mail.File, error) {
  790. var files []mail.File
  791. user, err := getUserForEventAction(user)
  792. if err != nil {
  793. return nil, err
  794. }
  795. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  796. err = user.CheckFsRoot(connectionID)
  797. defer user.CloseFs() //nolint:errcheck
  798. if err != nil {
  799. return nil, fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  800. }
  801. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  802. totalSize := int64(0)
  803. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  804. info, err := conn.DoStat(virtualPath, 0, false)
  805. if err != nil {
  806. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  807. }
  808. if !info.Mode().IsRegular() {
  809. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  810. }
  811. totalSize += info.Size()
  812. if totalSize > maxAttachmentsSize {
  813. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  814. }
  815. data, err := getFileContent(conn, virtualPath, int(info.Size()))
  816. if err != nil {
  817. return nil, fmt.Errorf("unable to get content for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  818. }
  819. files = append(files, mail.File{
  820. Name: path.Base(virtualPath),
  821. Data: data,
  822. })
  823. }
  824. return files, nil
  825. }
  826. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  827. if !strings.Contains(input, "{{") {
  828. return input
  829. }
  830. return replacer.Replace(input)
  831. }
  832. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  833. matched, err := path.Match(p.Pattern, name)
  834. if err != nil {
  835. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  836. return false
  837. }
  838. if p.InverseMatch {
  839. return !matched
  840. }
  841. return matched
  842. }
  843. // checkConditionPatterns returns false if patterns are defined and no match is found
  844. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  845. if len(patterns) == 0 {
  846. return true
  847. }
  848. for _, p := range patterns {
  849. if checkEventConditionPattern(p, name) {
  850. return true
  851. }
  852. }
  853. return false
  854. }
  855. func checkEventGroupConditionPatters(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  856. if len(patterns) == 0 {
  857. return true
  858. }
  859. for _, group := range groups {
  860. for _, p := range patterns {
  861. if checkEventConditionPattern(p, group.Name) {
  862. return true
  863. }
  864. }
  865. }
  866. return false
  867. }
  868. func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  869. if len(c.QueryParameters) > 0 {
  870. u, err := url.Parse(c.Endpoint)
  871. if err != nil {
  872. return "", fmt.Errorf("invalid endpoint: %w", err)
  873. }
  874. q := u.Query()
  875. for _, keyVal := range c.QueryParameters {
  876. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  877. }
  878. u.RawQuery = q.Encode()
  879. return u.String(), nil
  880. }
  881. return c.Endpoint, nil
  882. }
  883. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  884. conn *BaseConnection, replacer *strings.Replacer, params *EventParams,
  885. ) error {
  886. partWriter, err := m.CreatePart(h)
  887. if err != nil {
  888. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  889. return err
  890. }
  891. if part.Body != "" {
  892. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, replacer)))
  893. if err != nil {
  894. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  895. return err
  896. }
  897. return nil
  898. }
  899. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  900. data, err := params.getCompressedDataRetentionReport()
  901. if err != nil {
  902. return err
  903. }
  904. _, err = partWriter.Write(data)
  905. if err != nil {
  906. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  907. return err
  908. }
  909. return nil
  910. }
  911. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  912. if err != nil {
  913. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  914. return err
  915. }
  916. return nil
  917. }
  918. func getHTTPRuleActionBody(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  919. cancel context.CancelFunc, user dataprovider.User, params *EventParams,
  920. ) (io.ReadCloser, string, error) {
  921. var body io.ReadCloser
  922. if c.Method == http.MethodGet {
  923. return body, "", nil
  924. }
  925. if c.Body != "" {
  926. if c.Body == dataprovider.RetentionReportPlaceHolder {
  927. data, err := params.getCompressedDataRetentionReport()
  928. if err != nil {
  929. return body, "", err
  930. }
  931. return io.NopCloser(bytes.NewBuffer(data)), "", nil
  932. }
  933. return io.NopCloser(bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))), "", nil
  934. }
  935. if len(c.Parts) > 0 {
  936. r, w := io.Pipe()
  937. m := multipart.NewWriter(w)
  938. var conn *BaseConnection
  939. if user.Username != "" {
  940. var err error
  941. user, err = getUserForEventAction(user)
  942. if err != nil {
  943. return body, "", err
  944. }
  945. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  946. err = user.CheckFsRoot(connectionID)
  947. if err != nil {
  948. user.CloseFs() //nolint:errcheck
  949. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  950. user.Username, err)
  951. }
  952. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  953. }
  954. go func() {
  955. defer w.Close()
  956. defer user.CloseFs() //nolint:errcheck
  957. for _, part := range c.Parts {
  958. h := make(textproto.MIMEHeader)
  959. if part.Body != "" {
  960. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  961. } else {
  962. h.Set("Content-Disposition",
  963. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  964. multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(part.Filepath))))
  965. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  966. if contentType == "" {
  967. contentType = "application/octet-stream"
  968. }
  969. h.Set("Content-Type", contentType)
  970. }
  971. for _, keyVal := range part.Headers {
  972. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  973. }
  974. if err := writeHTTPPart(m, part, h, conn, replacer, params); err != nil {
  975. cancel()
  976. return
  977. }
  978. }
  979. m.Close()
  980. }()
  981. return r, m.FormDataContentType(), nil
  982. }
  983. return body, "", nil
  984. }
  985. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  986. if err := c.TryDecryptPassword(); err != nil {
  987. return err
  988. }
  989. addObjectData := false
  990. if params.Object != nil {
  991. addObjectData = c.HasObjectData()
  992. }
  993. replacements := params.getStringReplacements(addObjectData)
  994. replacer := strings.NewReplacer(replacements...)
  995. endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
  996. if err != nil {
  997. return err
  998. }
  999. ctx, cancel := c.GetContext()
  1000. defer cancel()
  1001. var user dataprovider.User
  1002. if c.HasMultipartFiles() {
  1003. user, err = params.getUserFromSender()
  1004. if err != nil {
  1005. return err
  1006. }
  1007. }
  1008. body, contentType, err := getHTTPRuleActionBody(c, replacer, cancel, user, params)
  1009. if err != nil {
  1010. return err
  1011. }
  1012. if body != nil {
  1013. defer body.Close()
  1014. }
  1015. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1016. if err != nil {
  1017. return err
  1018. }
  1019. if contentType != "" {
  1020. req.Header.Set("Content-Type", contentType)
  1021. }
  1022. if c.Username != "" {
  1023. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1024. }
  1025. for _, keyVal := range c.Headers {
  1026. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1027. }
  1028. client := c.GetHTTPClient()
  1029. defer client.CloseIdleConnections()
  1030. startTime := time.Now()
  1031. resp, err := client.Do(req)
  1032. if err != nil {
  1033. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1034. endpoint, time.Since(startTime), err)
  1035. return fmt.Errorf("error sending HTTP request: %w", err)
  1036. }
  1037. defer resp.Body.Close()
  1038. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1039. endpoint, time.Since(startTime), resp.StatusCode)
  1040. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1041. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1042. }
  1043. return nil
  1044. }
  1045. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1046. addObjectData := false
  1047. if params.Object != nil {
  1048. for _, k := range c.EnvVars {
  1049. if strings.Contains(k.Value, "{{ObjectData}}") {
  1050. addObjectData = true
  1051. break
  1052. }
  1053. }
  1054. }
  1055. replacements := params.getStringReplacements(addObjectData)
  1056. replacer := strings.NewReplacer(replacements...)
  1057. args := make([]string, 0, len(c.Args))
  1058. for _, arg := range c.Args {
  1059. args = append(args, replaceWithReplacer(arg, replacer))
  1060. }
  1061. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1062. defer cancel()
  1063. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1064. cmd.Env = []string{}
  1065. for _, keyVal := range c.EnvVars {
  1066. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1067. }
  1068. startTime := time.Now()
  1069. err := cmd.Run()
  1070. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1071. c.Cmd, time.Since(startTime), err)
  1072. return err
  1073. }
  1074. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1075. addObjectData := false
  1076. if params.Object != nil {
  1077. if strings.Contains(c.Body, "{{ObjectData}}") {
  1078. addObjectData = true
  1079. }
  1080. }
  1081. replacements := params.getStringReplacements(addObjectData)
  1082. replacer := strings.NewReplacer(replacements...)
  1083. body := replaceWithReplacer(c.Body, replacer)
  1084. subject := replaceWithReplacer(c.Subject, replacer)
  1085. startTime := time.Now()
  1086. var files []mail.File
  1087. fileAttachments := make([]string, 0, len(c.Attachments))
  1088. for _, attachment := range c.Attachments {
  1089. if attachment == dataprovider.RetentionReportPlaceHolder {
  1090. f, err := params.getRetentionReportsAsMailAttachment()
  1091. if err != nil {
  1092. return err
  1093. }
  1094. files = append(files, f)
  1095. continue
  1096. }
  1097. fileAttachments = append(fileAttachments, attachment)
  1098. }
  1099. if len(fileAttachments) > 0 {
  1100. user, err := params.getUserFromSender()
  1101. if err != nil {
  1102. return err
  1103. }
  1104. res, err := getMailAttachments(user, fileAttachments, replacer)
  1105. if err != nil {
  1106. return err
  1107. }
  1108. files = append(files, res...)
  1109. }
  1110. err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain, files...)
  1111. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1112. time.Since(startTime), err)
  1113. if err != nil {
  1114. return fmt.Errorf("unable to send email: %w", err)
  1115. }
  1116. return nil
  1117. }
  1118. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1119. err := user.LoadAndApplyGroupSettings()
  1120. if err != nil {
  1121. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1122. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1123. }
  1124. user.UploadDataTransfer = 0
  1125. user.UploadBandwidth = 0
  1126. user.DownloadBandwidth = 0
  1127. user.Filters.DisableFsChecks = false
  1128. user.Filters.FilePatterns = nil
  1129. user.Filters.BandwidthLimits = nil
  1130. user.Filters.DataTransferLimits = nil
  1131. for k := range user.Permissions {
  1132. user.Permissions[k] = []string{dataprovider.PermAny}
  1133. }
  1134. return user, nil
  1135. }
  1136. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1137. results := make([]string, 0, len(paths))
  1138. for _, p := range paths {
  1139. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1140. }
  1141. return util.RemoveDuplicates(results, false)
  1142. }
  1143. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1144. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1145. if err != nil {
  1146. return err
  1147. }
  1148. return conn.RemoveFile(fs, fsPath, item, info)
  1149. }
  1150. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1151. user, err := getUserForEventAction(user)
  1152. if err != nil {
  1153. return err
  1154. }
  1155. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1156. err = user.CheckFsRoot(connectionID)
  1157. defer user.CloseFs() //nolint:errcheck
  1158. if err != nil {
  1159. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1160. }
  1161. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1162. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1163. info, err := conn.DoStat(item, 0, false)
  1164. if err != nil {
  1165. if conn.IsNotExistError(err) {
  1166. continue
  1167. }
  1168. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1169. }
  1170. if info.IsDir() {
  1171. if err = conn.RemoveDir(item); err != nil {
  1172. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1173. }
  1174. } else {
  1175. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1176. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1177. }
  1178. }
  1179. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1180. }
  1181. return nil
  1182. }
  1183. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1184. conditions dataprovider.ConditionOptions, params *EventParams,
  1185. ) error {
  1186. users, err := params.getUsers()
  1187. if err != nil {
  1188. return fmt.Errorf("unable to get users: %w", err)
  1189. }
  1190. var failures []string
  1191. executed := 0
  1192. for _, user := range users {
  1193. // if sender is set, the conditions have already been evaluated
  1194. if params.sender == "" {
  1195. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1196. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, name conditions don't match",
  1197. user.Username)
  1198. continue
  1199. }
  1200. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1201. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, group name conditions don't match",
  1202. user.Username)
  1203. continue
  1204. }
  1205. }
  1206. executed++
  1207. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1208. params.AddError(err)
  1209. failures = append(failures, user.Username)
  1210. continue
  1211. }
  1212. }
  1213. if len(failures) > 0 {
  1214. return fmt.Errorf("fs delete failed for users: %+v", failures)
  1215. }
  1216. if executed == 0 {
  1217. eventManagerLog(logger.LevelError, "no delete executed")
  1218. return errors.New("no delete executed")
  1219. }
  1220. return nil
  1221. }
  1222. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1223. user, err := getUserForEventAction(user)
  1224. if err != nil {
  1225. return err
  1226. }
  1227. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1228. err = user.CheckFsRoot(connectionID)
  1229. defer user.CloseFs() //nolint:errcheck
  1230. if err != nil {
  1231. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1232. }
  1233. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1234. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1235. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1236. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1237. }
  1238. if err = conn.createDirIfMissing(item); err != nil {
  1239. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1240. }
  1241. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1242. }
  1243. return nil
  1244. }
  1245. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1246. conditions dataprovider.ConditionOptions, params *EventParams,
  1247. ) error {
  1248. users, err := params.getUsers()
  1249. if err != nil {
  1250. return fmt.Errorf("unable to get users: %w", err)
  1251. }
  1252. var failures []string
  1253. executed := 0
  1254. for _, user := range users {
  1255. // if sender is set, the conditions have already been evaluated
  1256. if params.sender == "" {
  1257. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1258. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, name conditions don't match",
  1259. user.Username)
  1260. continue
  1261. }
  1262. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1263. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, group name conditions don't match",
  1264. user.Username)
  1265. continue
  1266. }
  1267. }
  1268. executed++
  1269. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1270. failures = append(failures, user.Username)
  1271. continue
  1272. }
  1273. }
  1274. if len(failures) > 0 {
  1275. return fmt.Errorf("fs mkdir failed for users: %+v", failures)
  1276. }
  1277. if executed == 0 {
  1278. eventManagerLog(logger.LevelError, "no mkdir executed")
  1279. return errors.New("no mkdir executed")
  1280. }
  1281. return nil
  1282. }
  1283. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1284. user dataprovider.User,
  1285. ) error {
  1286. user, err := getUserForEventAction(user)
  1287. if err != nil {
  1288. return err
  1289. }
  1290. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1291. err = user.CheckFsRoot(connectionID)
  1292. defer user.CloseFs() //nolint:errcheck
  1293. if err != nil {
  1294. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1295. }
  1296. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1297. for _, item := range renames {
  1298. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1299. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1300. if err = conn.Rename(source, target); err != nil {
  1301. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1302. }
  1303. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1304. }
  1305. return nil
  1306. }
  1307. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1308. user dataprovider.User,
  1309. ) error {
  1310. user, err := getUserForEventAction(user)
  1311. if err != nil {
  1312. return err
  1313. }
  1314. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1315. err = user.CheckFsRoot(connectionID)
  1316. defer user.CloseFs() //nolint:errcheck
  1317. if err != nil {
  1318. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1319. }
  1320. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1321. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1322. if _, err = conn.DoStat(item, 0, false); err != nil {
  1323. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1324. }
  1325. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1326. }
  1327. return nil
  1328. }
  1329. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1330. conditions dataprovider.ConditionOptions, params *EventParams,
  1331. ) error {
  1332. users, err := params.getUsers()
  1333. if err != nil {
  1334. return fmt.Errorf("unable to get users: %w", err)
  1335. }
  1336. var failures []string
  1337. executed := 0
  1338. for _, user := range users {
  1339. // if sender is set, the conditions have already been evaluated
  1340. if params.sender == "" {
  1341. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1342. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, name conditions don't match",
  1343. user.Username)
  1344. continue
  1345. }
  1346. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1347. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, group name conditions don't match",
  1348. user.Username)
  1349. continue
  1350. }
  1351. }
  1352. executed++
  1353. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1354. failures = append(failures, user.Username)
  1355. params.AddError(err)
  1356. continue
  1357. }
  1358. }
  1359. if len(failures) > 0 {
  1360. return fmt.Errorf("fs rename failed for users: %+v", failures)
  1361. }
  1362. if executed == 0 {
  1363. eventManagerLog(logger.LevelError, "no rename executed")
  1364. return errors.New("no rename executed")
  1365. }
  1366. return nil
  1367. }
  1368. func getArchiveBaseDir(paths []string) string {
  1369. var parentDirs []string
  1370. for _, p := range paths {
  1371. parentDirs = append(parentDirs, path.Dir(p))
  1372. }
  1373. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1374. baseDir := "/"
  1375. if len(parentDirs) == 1 {
  1376. baseDir = parentDirs[0]
  1377. }
  1378. return baseDir
  1379. }
  1380. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1381. user dataprovider.User,
  1382. ) error {
  1383. user, err := getUserForEventAction(user)
  1384. if err != nil {
  1385. return err
  1386. }
  1387. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1388. err = user.CheckFsRoot(connectionID)
  1389. defer user.CloseFs() //nolint:errcheck
  1390. if err != nil {
  1391. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1392. }
  1393. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1394. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1395. paths := make([]string, 0, len(c.Paths))
  1396. for idx := range c.Paths {
  1397. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1398. if p == name {
  1399. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1400. }
  1401. paths = append(paths, p)
  1402. }
  1403. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name)
  1404. if err != nil {
  1405. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1406. return fmt.Errorf("unable to create archive: %w", err)
  1407. }
  1408. defer cancelFn()
  1409. paths = util.RemoveDuplicates(paths, false)
  1410. baseDir := getArchiveBaseDir(paths)
  1411. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1412. zipWriter := &zipWriterWrapper{
  1413. Name: name,
  1414. Writer: zip.NewWriter(writer),
  1415. Entries: make(map[string]bool),
  1416. }
  1417. for _, item := range paths {
  1418. if err := addZipEntry(zipWriter, conn, item, baseDir); err != nil {
  1419. closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err) //nolint:errcheck
  1420. return err
  1421. }
  1422. }
  1423. if err := zipWriter.Writer.Close(); err != nil {
  1424. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1425. closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err) //nolint:errcheck
  1426. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1427. }
  1428. return closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err)
  1429. }
  1430. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1431. params *EventParams,
  1432. ) error {
  1433. users, err := params.getUsers()
  1434. if err != nil {
  1435. return fmt.Errorf("unable to get users: %w", err)
  1436. }
  1437. var failures []string
  1438. executed := 0
  1439. for _, user := range users {
  1440. // if sender is set, the conditions have already been evaluated
  1441. if params.sender == "" {
  1442. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1443. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, name conditions don't match",
  1444. user.Username)
  1445. continue
  1446. }
  1447. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1448. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, group name conditions don't match",
  1449. user.Username)
  1450. continue
  1451. }
  1452. }
  1453. executed++
  1454. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1455. failures = append(failures, user.Username)
  1456. params.AddError(err)
  1457. continue
  1458. }
  1459. }
  1460. if len(failures) > 0 {
  1461. return fmt.Errorf("fs existence check failed for users: %+v", failures)
  1462. }
  1463. if executed == 0 {
  1464. eventManagerLog(logger.LevelError, "no existence check executed")
  1465. return errors.New("no existence check executed")
  1466. }
  1467. return nil
  1468. }
  1469. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1470. conditions dataprovider.ConditionOptions, params *EventParams,
  1471. ) error {
  1472. users, err := params.getUsers()
  1473. if err != nil {
  1474. return fmt.Errorf("unable to get users: %w", err)
  1475. }
  1476. var failures []string
  1477. executed := 0
  1478. for _, user := range users {
  1479. // if sender is set, the conditions have already been evaluated
  1480. if params.sender == "" {
  1481. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1482. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, name conditions don't match",
  1483. user.Username)
  1484. continue
  1485. }
  1486. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1487. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, group name conditions don't match",
  1488. user.Username)
  1489. continue
  1490. }
  1491. }
  1492. executed++
  1493. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1494. failures = append(failures, user.Username)
  1495. params.AddError(err)
  1496. continue
  1497. }
  1498. }
  1499. if len(failures) > 0 {
  1500. return fmt.Errorf("fs compress failed for users: %+v", failures)
  1501. }
  1502. if executed == 0 {
  1503. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1504. return errors.New("no file/folder compressed")
  1505. }
  1506. return nil
  1507. }
  1508. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1509. params *EventParams,
  1510. ) error {
  1511. addObjectData := false
  1512. replacements := params.getStringReplacements(addObjectData)
  1513. replacer := strings.NewReplacer(replacements...)
  1514. switch c.Type {
  1515. case dataprovider.FilesystemActionRename:
  1516. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1517. case dataprovider.FilesystemActionDelete:
  1518. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1519. case dataprovider.FilesystemActionMkdirs:
  1520. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1521. case dataprovider.FilesystemActionExist:
  1522. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1523. case dataprovider.FilesystemActionCompress:
  1524. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1525. default:
  1526. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1527. }
  1528. }
  1529. func executeQuotaResetForUser(user dataprovider.User) error {
  1530. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1531. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1532. user.Username, err)
  1533. return err
  1534. }
  1535. if !QuotaScans.AddUserQuotaScan(user.Username) {
  1536. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1537. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1538. }
  1539. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1540. numFiles, size, err := user.ScanQuota()
  1541. if err != nil {
  1542. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1543. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1544. }
  1545. err = dataprovider.UpdateUserQuota(&user, numFiles, size, true)
  1546. if err != nil {
  1547. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1548. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1549. }
  1550. return nil
  1551. }
  1552. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1553. users, err := params.getUsers()
  1554. if err != nil {
  1555. return fmt.Errorf("unable to get users: %w", err)
  1556. }
  1557. var failedResets []string
  1558. executed := 0
  1559. for _, user := range users {
  1560. // if sender is set, the conditions have already been evaluated
  1561. if params.sender == "" {
  1562. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1563. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, name conditions don't match",
  1564. user.Username)
  1565. continue
  1566. }
  1567. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1568. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, group name conditions don't match",
  1569. user.Username)
  1570. continue
  1571. }
  1572. }
  1573. executed++
  1574. if err = executeQuotaResetForUser(user); err != nil {
  1575. params.AddError(err)
  1576. failedResets = append(failedResets, user.Username)
  1577. continue
  1578. }
  1579. }
  1580. if len(failedResets) > 0 {
  1581. return fmt.Errorf("quota reset failed for users: %+v", failedResets)
  1582. }
  1583. if executed == 0 {
  1584. eventManagerLog(logger.LevelError, "no user quota reset executed")
  1585. return errors.New("no user quota reset executed")
  1586. }
  1587. return nil
  1588. }
  1589. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1590. folders, err := params.getFolders()
  1591. if err != nil {
  1592. return fmt.Errorf("unable to get folders: %w", err)
  1593. }
  1594. var failedResets []string
  1595. executed := 0
  1596. for _, folder := range folders {
  1597. // if sender is set, the conditions have already been evaluated
  1598. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  1599. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  1600. folder.Name)
  1601. continue
  1602. }
  1603. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  1604. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  1605. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  1606. failedResets = append(failedResets, folder.Name)
  1607. continue
  1608. }
  1609. executed++
  1610. f := vfs.VirtualFolder{
  1611. BaseVirtualFolder: folder,
  1612. VirtualPath: "/",
  1613. }
  1614. numFiles, size, err := f.ScanQuota()
  1615. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  1616. if err != nil {
  1617. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  1618. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  1619. failedResets = append(failedResets, folder.Name)
  1620. continue
  1621. }
  1622. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  1623. if err != nil {
  1624. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  1625. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  1626. failedResets = append(failedResets, folder.Name)
  1627. }
  1628. }
  1629. if len(failedResets) > 0 {
  1630. return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
  1631. }
  1632. if executed == 0 {
  1633. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  1634. return errors.New("no folder quota reset executed")
  1635. }
  1636. return nil
  1637. }
  1638. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1639. users, err := params.getUsers()
  1640. if err != nil {
  1641. return fmt.Errorf("unable to get users: %w", err)
  1642. }
  1643. var failedResets []string
  1644. executed := 0
  1645. for _, user := range users {
  1646. // if sender is set, the conditions have already been evaluated
  1647. if params.sender == "" {
  1648. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1649. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, name conditions don't match",
  1650. user.Username)
  1651. continue
  1652. }
  1653. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1654. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, group name conditions don't match",
  1655. user.Username)
  1656. continue
  1657. }
  1658. }
  1659. executed++
  1660. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  1661. if err != nil {
  1662. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  1663. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  1664. failedResets = append(failedResets, user.Username)
  1665. }
  1666. }
  1667. if len(failedResets) > 0 {
  1668. return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
  1669. }
  1670. if executed == 0 {
  1671. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  1672. return errors.New("no transfer quota reset executed")
  1673. }
  1674. return nil
  1675. }
  1676. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  1677. params *EventParams, actionName string,
  1678. ) error {
  1679. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1680. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  1681. user.Username, err)
  1682. return err
  1683. }
  1684. check := RetentionCheck{
  1685. Folders: folders,
  1686. }
  1687. c := RetentionChecks.Add(check, &user)
  1688. if c == nil {
  1689. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  1690. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  1691. }
  1692. defer func() {
  1693. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  1694. Username: user.Username,
  1695. ActionName: actionName,
  1696. Results: c.results,
  1697. })
  1698. }()
  1699. if err := c.Start(); err != nil {
  1700. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  1701. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  1702. }
  1703. return nil
  1704. }
  1705. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  1706. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  1707. ) error {
  1708. users, err := params.getUsers()
  1709. if err != nil {
  1710. return fmt.Errorf("unable to get users: %w", err)
  1711. }
  1712. var failedChecks []string
  1713. executed := 0
  1714. for _, user := range users {
  1715. // if sender is set, the conditions have already been evaluated
  1716. if params.sender == "" {
  1717. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1718. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, name conditions don't match",
  1719. user.Username)
  1720. continue
  1721. }
  1722. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1723. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, group name conditions don't match",
  1724. user.Username)
  1725. continue
  1726. }
  1727. }
  1728. executed++
  1729. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  1730. failedChecks = append(failedChecks, user.Username)
  1731. params.AddError(err)
  1732. continue
  1733. }
  1734. }
  1735. if len(failedChecks) > 0 {
  1736. return fmt.Errorf("retention check failed for users: %+v", failedChecks)
  1737. }
  1738. if executed == 0 {
  1739. eventManagerLog(logger.LevelError, "no retention check executed")
  1740. return errors.New("no retention check executed")
  1741. }
  1742. return nil
  1743. }
  1744. func executeMetadataCheckForUser(user dataprovider.User) error {
  1745. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1746. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1747. user.Username, err)
  1748. return err
  1749. }
  1750. if !ActiveMetadataChecks.Add(user.Username) {
  1751. eventManagerLog(logger.LevelError, "another metadata check is already in progress for user %q", user.Username)
  1752. return fmt.Errorf("another metadata check is in progress for user %q", user.Username)
  1753. }
  1754. defer ActiveMetadataChecks.Remove(user.Username)
  1755. if err := user.CheckMetadataConsistency(); err != nil {
  1756. eventManagerLog(logger.LevelError, "error checking metadata consistence for user %q: %v", user.Username, err)
  1757. return fmt.Errorf("error checking metadata consistence for user %q: %w", user.Username, err)
  1758. }
  1759. return nil
  1760. }
  1761. func executeMetadataCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1762. users, err := params.getUsers()
  1763. if err != nil {
  1764. return fmt.Errorf("unable to get users: %w", err)
  1765. }
  1766. var failures []string
  1767. var executed int
  1768. for _, user := range users {
  1769. // if sender is set, the conditions have already been evaluated
  1770. if params.sender == "" {
  1771. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1772. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, name conditions don't match",
  1773. user.Username)
  1774. continue
  1775. }
  1776. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1777. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, group name conditions don't match",
  1778. user.Username)
  1779. continue
  1780. }
  1781. }
  1782. executed++
  1783. if err = executeMetadataCheckForUser(user); err != nil {
  1784. params.AddError(err)
  1785. failures = append(failures, user.Username)
  1786. continue
  1787. }
  1788. }
  1789. if len(failures) > 0 {
  1790. return fmt.Errorf("metadata check failed for users: %+v", failures)
  1791. }
  1792. if executed == 0 {
  1793. eventManagerLog(logger.LevelError, "no metadata check executed")
  1794. return errors.New("no metadata check executed")
  1795. }
  1796. return nil
  1797. }
  1798. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams,
  1799. conditions dataprovider.ConditionOptions,
  1800. ) error {
  1801. var err error
  1802. switch action.Type {
  1803. case dataprovider.ActionTypeHTTP:
  1804. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  1805. case dataprovider.ActionTypeCommand:
  1806. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  1807. case dataprovider.ActionTypeEmail:
  1808. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  1809. case dataprovider.ActionTypeBackup:
  1810. var backupPath string
  1811. backupPath, err = dataprovider.ExecuteBackup()
  1812. if err == nil {
  1813. params.setBackupParams(backupPath)
  1814. }
  1815. case dataprovider.ActionTypeUserQuotaReset:
  1816. err = executeUsersQuotaResetRuleAction(conditions, params)
  1817. case dataprovider.ActionTypeFolderQuotaReset:
  1818. err = executeFoldersQuotaResetRuleAction(conditions, params)
  1819. case dataprovider.ActionTypeTransferQuotaReset:
  1820. err = executeTransferQuotaResetRuleAction(conditions, params)
  1821. case dataprovider.ActionTypeDataRetentionCheck:
  1822. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  1823. case dataprovider.ActionTypeMetadataCheck:
  1824. err = executeMetadataCheckRuleAction(conditions, params)
  1825. case dataprovider.ActionTypeFilesystem:
  1826. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  1827. default:
  1828. err = fmt.Errorf("unsupported action type: %d", action.Type)
  1829. }
  1830. if err != nil {
  1831. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  1832. }
  1833. params.AddError(err)
  1834. return err
  1835. }
  1836. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  1837. var errRes error
  1838. for _, rule := range rules {
  1839. var failedActions []string
  1840. paramsCopy := params.getACopy()
  1841. for _, action := range rule.Actions {
  1842. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  1843. startTime := time.Now()
  1844. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  1845. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  1846. action.Name, rule.Name, time.Since(startTime), err)
  1847. failedActions = append(failedActions, action.Name)
  1848. // we return the last error, it is ok for now
  1849. errRes = err
  1850. if action.Options.StopOnFailure {
  1851. break
  1852. }
  1853. } else {
  1854. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  1855. action.Name, rule.Name, time.Since(startTime))
  1856. }
  1857. }
  1858. }
  1859. // execute async actions if any, including failure actions
  1860. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  1861. }
  1862. return errRes
  1863. }
  1864. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  1865. eventManager.addAsyncTask()
  1866. defer eventManager.removeAsyncTask()
  1867. for _, rule := range rules {
  1868. executeRuleAsyncActions(rule, params.getACopy(), nil)
  1869. }
  1870. }
  1871. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  1872. for _, action := range rule.Actions {
  1873. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  1874. startTime := time.Now()
  1875. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  1876. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  1877. action.Name, rule.Name, time.Since(startTime), err)
  1878. failedActions = append(failedActions, action.Name)
  1879. if action.Options.StopOnFailure {
  1880. break
  1881. }
  1882. } else {
  1883. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  1884. action.Name, rule.Name, time.Since(startTime))
  1885. }
  1886. }
  1887. }
  1888. if len(failedActions) > 0 {
  1889. params.updateStatusFromError = false
  1890. // execute failure actions
  1891. for _, action := range rule.Actions {
  1892. if action.Options.IsFailureAction {
  1893. startTime := time.Now()
  1894. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  1895. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  1896. action.Name, rule.Name, time.Since(startTime), err)
  1897. if action.Options.StopOnFailure {
  1898. break
  1899. }
  1900. } else {
  1901. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  1902. action.Name, rule.Name, time.Since(startTime))
  1903. }
  1904. }
  1905. }
  1906. }
  1907. }
  1908. type eventCronJob struct {
  1909. ruleName string
  1910. }
  1911. func (j *eventCronJob) getTask(rule dataprovider.EventRule) (dataprovider.Task, error) {
  1912. if rule.GuardFromConcurrentExecution() {
  1913. task, err := dataprovider.GetTaskByName(rule.Name)
  1914. if _, ok := err.(*util.RecordNotFoundError); ok {
  1915. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  1916. task = dataprovider.Task{
  1917. Name: rule.Name,
  1918. UpdateAt: 0,
  1919. Version: 0,
  1920. }
  1921. err = dataprovider.AddTask(rule.Name)
  1922. if err != nil {
  1923. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  1924. return task, err
  1925. }
  1926. } else {
  1927. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  1928. }
  1929. return task, err
  1930. }
  1931. return dataprovider.Task{}, nil
  1932. }
  1933. func (j *eventCronJob) Run() {
  1934. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  1935. rule, err := dataprovider.EventRuleExists(j.ruleName)
  1936. if err != nil {
  1937. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  1938. return
  1939. }
  1940. if err = rule.CheckActionsConsistency(""); err != nil {
  1941. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  1942. return
  1943. }
  1944. task, err := j.getTask(rule)
  1945. if err != nil {
  1946. return
  1947. }
  1948. if task.Name != "" {
  1949. updateInterval := 5 * time.Minute
  1950. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  1951. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  1952. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  1953. return
  1954. }
  1955. err = dataprovider.UpdateTask(rule.Name, task.Version)
  1956. if err != nil {
  1957. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  1958. rule.Name, err)
  1959. return
  1960. }
  1961. ticker := time.NewTicker(updateInterval)
  1962. done := make(chan bool)
  1963. defer func() {
  1964. done <- true
  1965. ticker.Stop()
  1966. }()
  1967. go func(taskName string) {
  1968. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  1969. for {
  1970. select {
  1971. case <-done:
  1972. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  1973. return
  1974. case <-ticker.C:
  1975. err := dataprovider.UpdateTaskTimestamp(taskName)
  1976. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  1977. }
  1978. }
  1979. }(task.Name)
  1980. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  1981. } else {
  1982. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  1983. }
  1984. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  1985. }
  1986. type zipWriterWrapper struct {
  1987. Name string
  1988. Entries map[string]bool
  1989. Writer *zip.Writer
  1990. }
  1991. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  1992. logger.Log(level, "eventmanager", "", format, v...)
  1993. }