eventmanager.go 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "mime"
  22. "mime/multipart"
  23. "net/http"
  24. "net/textproto"
  25. "net/url"
  26. "os"
  27. "os/exec"
  28. "path"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/robfig/cron/v3"
  34. "github.com/rs/xid"
  35. "github.com/sftpgo/sdk"
  36. mail "github.com/xhit/go-simple-mail/v2"
  37. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  38. "github.com/drakkan/sftpgo/v2/internal/logger"
  39. "github.com/drakkan/sftpgo/v2/internal/plugin"
  40. "github.com/drakkan/sftpgo/v2/internal/smtp"
  41. "github.com/drakkan/sftpgo/v2/internal/util"
  42. "github.com/drakkan/sftpgo/v2/internal/vfs"
  43. )
  44. const (
  45. ipBlockedEventName = "IP Blocked"
  46. emailAttachmentsMaxSize = int64(10 * 1024 * 1024)
  47. )
  48. var (
  49. // eventManager handle the supported event rules actions
  50. eventManager eventRulesContainer
  51. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  52. )
  53. func init() {
  54. eventManager = eventRulesContainer{
  55. schedulesMapping: make(map[string][]cron.EntryID),
  56. // arbitrary maximum number of concurrent asynchronous tasks,
  57. // each task could execute multiple actions
  58. concurrencyGuard: make(chan struct{}, 200),
  59. }
  60. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  61. func(operation, executor, ip, objectType, objectName string, object plugin.Renderer) {
  62. eventManager.handleProviderEvent(EventParams{
  63. Name: executor,
  64. ObjectName: objectName,
  65. Event: operation,
  66. Status: 1,
  67. ObjectType: objectType,
  68. IP: ip,
  69. Timestamp: time.Now().UnixNano(),
  70. Object: object,
  71. })
  72. })
  73. }
  74. // HandleCertificateEvent checks and executes action rules for certificate events
  75. func HandleCertificateEvent(params EventParams) {
  76. eventManager.handleCertificateEvent(params)
  77. }
  78. // eventRulesContainer stores event rules by trigger
  79. type eventRulesContainer struct {
  80. sync.RWMutex
  81. lastLoad atomic.Int64
  82. FsEvents []dataprovider.EventRule
  83. ProviderEvents []dataprovider.EventRule
  84. Schedules []dataprovider.EventRule
  85. IPBlockedEvents []dataprovider.EventRule
  86. CertificateEvents []dataprovider.EventRule
  87. schedulesMapping map[string][]cron.EntryID
  88. concurrencyGuard chan struct{}
  89. }
  90. func (r *eventRulesContainer) addAsyncTask() {
  91. r.concurrencyGuard <- struct{}{}
  92. }
  93. func (r *eventRulesContainer) removeAsyncTask() {
  94. <-r.concurrencyGuard
  95. }
  96. func (r *eventRulesContainer) getLastLoadTime() int64 {
  97. return r.lastLoad.Load()
  98. }
  99. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  100. r.lastLoad.Store(modTime)
  101. }
  102. // RemoveRule deletes the rule with the specified name
  103. func (r *eventRulesContainer) RemoveRule(name string) {
  104. r.Lock()
  105. defer r.Unlock()
  106. r.removeRuleInternal(name)
  107. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  108. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  109. }
  110. func (r *eventRulesContainer) removeRuleInternal(name string) {
  111. for idx := range r.FsEvents {
  112. if r.FsEvents[idx].Name == name {
  113. lastIdx := len(r.FsEvents) - 1
  114. r.FsEvents[idx] = r.FsEvents[lastIdx]
  115. r.FsEvents = r.FsEvents[:lastIdx]
  116. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  117. return
  118. }
  119. }
  120. for idx := range r.ProviderEvents {
  121. if r.ProviderEvents[idx].Name == name {
  122. lastIdx := len(r.ProviderEvents) - 1
  123. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  124. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  125. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  126. return
  127. }
  128. }
  129. for idx := range r.IPBlockedEvents {
  130. if r.IPBlockedEvents[idx].Name == name {
  131. lastIdx := len(r.IPBlockedEvents) - 1
  132. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  133. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  134. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  135. return
  136. }
  137. }
  138. for idx := range r.CertificateEvents {
  139. if r.CertificateEvents[idx].Name == name {
  140. lastIdx := len(r.CertificateEvents) - 1
  141. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  142. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  143. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  144. return
  145. }
  146. }
  147. for idx := range r.Schedules {
  148. if r.Schedules[idx].Name == name {
  149. if schedules, ok := r.schedulesMapping[name]; ok {
  150. for _, entryID := range schedules {
  151. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  152. eventScheduler.Remove(entryID)
  153. }
  154. delete(r.schedulesMapping, name)
  155. }
  156. lastIdx := len(r.Schedules) - 1
  157. r.Schedules[idx] = r.Schedules[lastIdx]
  158. r.Schedules = r.Schedules[:lastIdx]
  159. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  160. return
  161. }
  162. }
  163. }
  164. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  165. r.removeRuleInternal(rule.Name)
  166. if rule.DeletedAt > 0 {
  167. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  168. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  169. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  170. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  171. }
  172. return
  173. }
  174. switch rule.Trigger {
  175. case dataprovider.EventTriggerFsEvent:
  176. r.FsEvents = append(r.FsEvents, rule)
  177. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  178. case dataprovider.EventTriggerProviderEvent:
  179. r.ProviderEvents = append(r.ProviderEvents, rule)
  180. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  181. case dataprovider.EventTriggerIPBlocked:
  182. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  183. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  184. case dataprovider.EventTriggerCertificate:
  185. r.CertificateEvents = append(r.CertificateEvents, rule)
  186. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  187. case dataprovider.EventTriggerSchedule:
  188. for _, schedule := range rule.Conditions.Schedules {
  189. cronSpec := schedule.GetCronSpec()
  190. job := &eventCronJob{
  191. ruleName: dataprovider.ConvertName(rule.Name),
  192. }
  193. entryID, err := eventScheduler.AddJob(cronSpec, job)
  194. if err != nil {
  195. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  196. return
  197. }
  198. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  199. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  200. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  201. }
  202. r.Schedules = append(r.Schedules, rule)
  203. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  204. default:
  205. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  206. }
  207. }
  208. func (r *eventRulesContainer) loadRules() {
  209. eventManagerLog(logger.LevelDebug, "loading updated rules")
  210. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  211. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  212. if err != nil {
  213. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  214. return
  215. }
  216. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  217. if len(rules) > 0 {
  218. r.Lock()
  219. defer r.Unlock()
  220. for _, rule := range rules {
  221. r.addUpdateRuleInternal(rule)
  222. }
  223. }
  224. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d",
  225. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents))
  226. r.setLastLoadTime(modTime)
  227. }
  228. func (r *eventRulesContainer) checkProviderEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  229. if !util.Contains(conditions.ProviderEvents, params.Event) {
  230. return false
  231. }
  232. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  233. return false
  234. }
  235. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  236. return false
  237. }
  238. return true
  239. }
  240. func (r *eventRulesContainer) checkFsEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  241. if !util.Contains(conditions.FsEvents, params.Event) {
  242. return false
  243. }
  244. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  245. return false
  246. }
  247. if !checkEventGroupConditionPatters(params.Groups, conditions.Options.GroupNames) {
  248. return false
  249. }
  250. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  251. if !checkEventConditionPatterns(params.ObjectName, conditions.Options.FsPaths) {
  252. return false
  253. }
  254. }
  255. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  256. return false
  257. }
  258. if params.Event == operationUpload || params.Event == operationDownload {
  259. if conditions.Options.MinFileSize > 0 {
  260. if params.FileSize < conditions.Options.MinFileSize {
  261. return false
  262. }
  263. }
  264. if conditions.Options.MaxFileSize > 0 {
  265. if params.FileSize > conditions.Options.MaxFileSize {
  266. return false
  267. }
  268. }
  269. }
  270. return true
  271. }
  272. // hasFsRules returns true if there are any rules for filesystem event triggers
  273. func (r *eventRulesContainer) hasFsRules() bool {
  274. r.RLock()
  275. defer r.RUnlock()
  276. return len(r.FsEvents) > 0
  277. }
  278. // handleFsEvent executes the rules actions defined for the specified event
  279. func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
  280. if params.Protocol == protocolEventAction {
  281. return nil
  282. }
  283. r.RLock()
  284. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  285. for _, rule := range r.FsEvents {
  286. if r.checkFsEventMatch(rule.Conditions, params) {
  287. if err := rule.CheckActionsConsistency(""); err != nil {
  288. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  289. rule.Name, err, params.Event)
  290. continue
  291. }
  292. hasSyncActions := false
  293. for _, action := range rule.Actions {
  294. if action.Options.ExecuteSync {
  295. hasSyncActions = true
  296. break
  297. }
  298. }
  299. if hasSyncActions {
  300. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  301. } else {
  302. rulesAsync = append(rulesAsync, rule)
  303. }
  304. }
  305. }
  306. r.RUnlock()
  307. params.sender = params.Name
  308. if len(rulesAsync) > 0 {
  309. go executeAsyncRulesActions(rulesAsync, params)
  310. }
  311. if len(rulesWithSyncActions) > 0 {
  312. return executeSyncRulesActions(rulesWithSyncActions, params)
  313. }
  314. return nil
  315. }
  316. // username is populated for user objects
  317. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  318. r.RLock()
  319. defer r.RUnlock()
  320. var rules []dataprovider.EventRule
  321. for _, rule := range r.ProviderEvents {
  322. if r.checkProviderEventMatch(rule.Conditions, params) {
  323. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  324. rules = append(rules, rule)
  325. } else {
  326. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  327. rule.Name, err, params.Event, params.ObjectType)
  328. }
  329. }
  330. }
  331. if len(rules) > 0 {
  332. params.sender = params.ObjectName
  333. go executeAsyncRulesActions(rules, params)
  334. }
  335. }
  336. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  337. r.RLock()
  338. defer r.RUnlock()
  339. if len(r.IPBlockedEvents) == 0 {
  340. return
  341. }
  342. var rules []dataprovider.EventRule
  343. for _, rule := range r.IPBlockedEvents {
  344. if err := rule.CheckActionsConsistency(""); err == nil {
  345. rules = append(rules, rule)
  346. } else {
  347. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  348. rule.Name, err, params.Event)
  349. }
  350. }
  351. if len(rules) > 0 {
  352. go executeAsyncRulesActions(rules, params)
  353. }
  354. }
  355. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  356. r.RLock()
  357. defer r.RUnlock()
  358. if len(r.CertificateEvents) == 0 {
  359. return
  360. }
  361. var rules []dataprovider.EventRule
  362. for _, rule := range r.CertificateEvents {
  363. if err := rule.CheckActionsConsistency(""); err == nil {
  364. rules = append(rules, rule)
  365. } else {
  366. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  367. rule.Name, err, params.Event)
  368. }
  369. }
  370. if len(rules) > 0 {
  371. go executeAsyncRulesActions(rules, params)
  372. }
  373. }
  374. // EventParams defines the supported event parameters
  375. type EventParams struct {
  376. Name string
  377. Groups []sdk.GroupMapping
  378. Event string
  379. Status int
  380. VirtualPath string
  381. FsPath string
  382. VirtualTargetPath string
  383. FsTargetPath string
  384. ObjectName string
  385. ObjectType string
  386. FileSize int64
  387. Protocol string
  388. IP string
  389. Timestamp int64
  390. Object plugin.Renderer
  391. sender string
  392. updateStatusFromError bool
  393. errors []string
  394. }
  395. func (p *EventParams) getACopy() *EventParams {
  396. params := *p
  397. params.errors = make([]string, len(p.errors))
  398. copy(params.errors, p.errors)
  399. return &params
  400. }
  401. // AddError adds a new error to the event params and update the status if needed
  402. func (p *EventParams) AddError(err error) {
  403. if err == nil {
  404. return
  405. }
  406. if p.updateStatusFromError && p.Status == 1 {
  407. p.Status = 2
  408. }
  409. p.errors = append(p.errors, err.Error())
  410. }
  411. func (p *EventParams) getStatusString() string {
  412. switch p.Status {
  413. case 1:
  414. return "OK"
  415. default:
  416. return "KO"
  417. }
  418. }
  419. // getUsers returns users with group settings not applied
  420. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  421. if p.sender == "" {
  422. users, err := dataprovider.DumpUsers()
  423. if err != nil {
  424. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  425. return users, errors.New("unable to get users")
  426. }
  427. return users, nil
  428. }
  429. user, err := p.getUserFromSender()
  430. if err != nil {
  431. return nil, err
  432. }
  433. return []dataprovider.User{user}, nil
  434. }
  435. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  436. user, err := dataprovider.UserExists(p.sender)
  437. if err != nil {
  438. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  439. return user, fmt.Errorf("error getting user %q", p.sender)
  440. }
  441. return user, nil
  442. }
  443. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  444. if p.sender == "" {
  445. return dataprovider.DumpFolders()
  446. }
  447. folder, err := dataprovider.GetFolderByName(p.sender)
  448. if err != nil {
  449. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  450. }
  451. return []vfs.BaseVirtualFolder{folder}, nil
  452. }
  453. func (p *EventParams) getStringReplacements(addObjectData bool) []string {
  454. replacements := []string{
  455. "{{Name}}", p.Name,
  456. "{{Event}}", p.Event,
  457. "{{Status}}", fmt.Sprintf("%d", p.Status),
  458. "{{VirtualPath}}", p.VirtualPath,
  459. "{{FsPath}}", p.FsPath,
  460. "{{VirtualTargetPath}}", p.VirtualTargetPath,
  461. "{{FsTargetPath}}", p.FsTargetPath,
  462. "{{ObjectName}}", p.ObjectName,
  463. "{{ObjectType}}", p.ObjectType,
  464. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  465. "{{Protocol}}", p.Protocol,
  466. "{{IP}}", p.IP,
  467. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  468. "{{StatusString}}", p.getStatusString(),
  469. }
  470. if len(p.errors) > 0 {
  471. replacements = append(replacements, "{{ErrorString}}", strings.Join(p.errors, ", "))
  472. } else {
  473. replacements = append(replacements, "{{ErrorString}}", "")
  474. }
  475. replacements = append(replacements, "{{ObjectData}}", "")
  476. if addObjectData {
  477. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  478. if err == nil {
  479. replacements[len(replacements)-1] = string(data)
  480. }
  481. }
  482. return replacements
  483. }
  484. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  485. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  486. if err != nil {
  487. return nil, nil, err
  488. }
  489. f, r, cancelFn, err := fs.Open(fsPath, 0)
  490. if err != nil {
  491. return nil, nil, err
  492. }
  493. if cancelFn == nil {
  494. cancelFn = func() {}
  495. }
  496. if f != nil {
  497. return f, cancelFn, nil
  498. }
  499. return r, cancelFn, nil
  500. }
  501. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  502. reader, cancelFn, err := getFileReader(conn, virtualPath)
  503. if err != nil {
  504. return err
  505. }
  506. defer cancelFn()
  507. defer reader.Close()
  508. _, err = io.Copy(w, reader)
  509. return err
  510. }
  511. func getFileContent(conn *BaseConnection, virtualPath string, expectedSize int) ([]byte, error) {
  512. reader, cancelFn, err := getFileReader(conn, virtualPath)
  513. if err != nil {
  514. return nil, err
  515. }
  516. defer cancelFn()
  517. defer reader.Close()
  518. data := make([]byte, expectedSize)
  519. _, err = io.ReadFull(reader, data)
  520. return data, err
  521. }
  522. func getMailAttachments(user dataprovider.User, attachments []string, replacer *strings.Replacer) ([]mail.File, error) {
  523. var files []mail.File
  524. user, err := getUserForEventAction(user)
  525. if err != nil {
  526. return nil, err
  527. }
  528. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  529. err = user.CheckFsRoot(connectionID)
  530. defer user.CloseFs() //nolint:errcheck
  531. if err != nil {
  532. return nil, fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  533. }
  534. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  535. totalSize := int64(0)
  536. for _, virtualPath := range attachments {
  537. virtualPath = util.CleanPath(replaceWithReplacer(virtualPath, replacer))
  538. info, err := conn.DoStat(virtualPath, 0, false)
  539. if err != nil {
  540. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  541. }
  542. if !info.Mode().IsRegular() {
  543. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  544. }
  545. totalSize += info.Size()
  546. if totalSize > emailAttachmentsMaxSize {
  547. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  548. }
  549. data, err := getFileContent(conn, virtualPath, int(info.Size()))
  550. if err != nil {
  551. return nil, fmt.Errorf("unable to get content for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  552. }
  553. files = append(files, mail.File{
  554. Name: path.Base(virtualPath),
  555. Data: data,
  556. })
  557. }
  558. return files, nil
  559. }
  560. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  561. if !strings.Contains(input, "{{") {
  562. return input
  563. }
  564. return replacer.Replace(input)
  565. }
  566. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  567. matched, err := path.Match(p.Pattern, name)
  568. if err != nil {
  569. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  570. return false
  571. }
  572. if p.InverseMatch {
  573. return !matched
  574. }
  575. return matched
  576. }
  577. // checkConditionPatterns returns false if patterns are defined and no match is found
  578. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  579. if len(patterns) == 0 {
  580. return true
  581. }
  582. for _, p := range patterns {
  583. if checkEventConditionPattern(p, name) {
  584. return true
  585. }
  586. }
  587. return false
  588. }
  589. func checkEventGroupConditionPatters(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  590. if len(patterns) == 0 {
  591. return true
  592. }
  593. for _, group := range groups {
  594. for _, p := range patterns {
  595. if checkEventConditionPattern(p, group.Name) {
  596. return true
  597. }
  598. }
  599. }
  600. return false
  601. }
  602. func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  603. if len(c.QueryParameters) > 0 {
  604. u, err := url.Parse(c.Endpoint)
  605. if err != nil {
  606. return "", fmt.Errorf("invalid endpoint: %w", err)
  607. }
  608. q := u.Query()
  609. for _, keyVal := range c.QueryParameters {
  610. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  611. }
  612. u.RawQuery = q.Encode()
  613. return u.String(), nil
  614. }
  615. return c.Endpoint, nil
  616. }
  617. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  618. conn *BaseConnection, replacer *strings.Replacer,
  619. ) error {
  620. partWriter, err := m.CreatePart(h)
  621. if err != nil {
  622. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  623. return err
  624. }
  625. if part.Body != "" {
  626. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, replacer)))
  627. if err != nil {
  628. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  629. return err
  630. }
  631. return nil
  632. }
  633. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  634. if err != nil {
  635. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  636. return err
  637. }
  638. return nil
  639. }
  640. func getHTTPRuleActionBody(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  641. cancel context.CancelFunc, user dataprovider.User,
  642. ) (io.ReadCloser, string, error) {
  643. var body io.ReadCloser
  644. if c.Method == http.MethodGet {
  645. return body, "", nil
  646. }
  647. if c.Body != "" {
  648. return io.NopCloser(bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))), "", nil
  649. }
  650. if len(c.Parts) > 0 {
  651. r, w := io.Pipe()
  652. m := multipart.NewWriter(w)
  653. var conn *BaseConnection
  654. if user.Username != "" {
  655. var err error
  656. user, err = getUserForEventAction(user)
  657. if err != nil {
  658. return body, "", err
  659. }
  660. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  661. err = user.CheckFsRoot(connectionID)
  662. if err != nil {
  663. user.CloseFs() //nolint:errcheck
  664. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  665. user.Username, err)
  666. }
  667. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  668. }
  669. go func() {
  670. defer w.Close()
  671. defer user.CloseFs() //nolint:errcheck
  672. for _, part := range c.Parts {
  673. h := make(textproto.MIMEHeader)
  674. if part.Body != "" {
  675. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  676. } else {
  677. filePath := util.CleanPath(replacer.Replace(part.Filepath))
  678. h.Set("Content-Disposition",
  679. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  680. multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(filePath))))
  681. contentType := mime.TypeByExtension(path.Ext(filePath))
  682. if contentType == "" {
  683. contentType = "application/octet-stream"
  684. }
  685. h.Set("Content-Type", contentType)
  686. }
  687. for _, keyVal := range part.Headers {
  688. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  689. }
  690. if err := writeHTTPPart(m, part, h, conn, replacer); err != nil {
  691. cancel()
  692. return
  693. }
  694. }
  695. m.Close()
  696. }()
  697. return r, m.FormDataContentType(), nil
  698. }
  699. return body, "", nil
  700. }
  701. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  702. if err := c.TryDecryptPassword(); err != nil {
  703. return err
  704. }
  705. addObjectData := false
  706. if params.Object != nil {
  707. addObjectData = c.HasObjectData()
  708. }
  709. replacements := params.getStringReplacements(addObjectData)
  710. replacer := strings.NewReplacer(replacements...)
  711. endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
  712. if err != nil {
  713. return err
  714. }
  715. ctx, cancel := c.GetContext()
  716. defer cancel()
  717. var user dataprovider.User
  718. if c.HasMultipartFile() {
  719. user, err = params.getUserFromSender()
  720. if err != nil {
  721. return err
  722. }
  723. }
  724. body, contentType, err := getHTTPRuleActionBody(c, replacer, cancel, user)
  725. if err != nil {
  726. return err
  727. }
  728. if body != nil {
  729. defer body.Close()
  730. }
  731. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  732. if err != nil {
  733. return err
  734. }
  735. if contentType != "" {
  736. req.Header.Set("Content-Type", contentType)
  737. }
  738. if c.Username != "" {
  739. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  740. }
  741. for _, keyVal := range c.Headers {
  742. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  743. }
  744. client := c.GetHTTPClient()
  745. defer client.CloseIdleConnections()
  746. startTime := time.Now()
  747. resp, err := client.Do(req)
  748. if err != nil {
  749. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  750. endpoint, time.Since(startTime), err)
  751. return fmt.Errorf("error sending HTTP request: %w", err)
  752. }
  753. defer resp.Body.Close()
  754. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  755. endpoint, time.Since(startTime), resp.StatusCode)
  756. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  757. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  758. }
  759. return nil
  760. }
  761. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  762. envVars := make([]string, 0, len(c.EnvVars))
  763. addObjectData := false
  764. if params.Object != nil {
  765. for _, k := range c.EnvVars {
  766. if strings.Contains(k.Value, "{{ObjectData}}") {
  767. addObjectData = true
  768. break
  769. }
  770. }
  771. }
  772. replacements := params.getStringReplacements(addObjectData)
  773. replacer := strings.NewReplacer(replacements...)
  774. for _, keyVal := range c.EnvVars {
  775. envVars = append(envVars, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  776. }
  777. args := make([]string, 0, len(c.Args))
  778. for _, arg := range c.Args {
  779. args = append(args, replaceWithReplacer(arg, replacer))
  780. }
  781. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  782. defer cancel()
  783. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  784. cmd.Env = append(cmd.Env, os.Environ()...)
  785. cmd.Env = append(cmd.Env, envVars...)
  786. startTime := time.Now()
  787. err := cmd.Run()
  788. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  789. c.Cmd, time.Since(startTime), err)
  790. return err
  791. }
  792. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  793. addObjectData := false
  794. if params.Object != nil {
  795. if strings.Contains(c.Body, "{{ObjectData}}") {
  796. addObjectData = true
  797. }
  798. }
  799. replacements := params.getStringReplacements(addObjectData)
  800. replacer := strings.NewReplacer(replacements...)
  801. body := replaceWithReplacer(c.Body, replacer)
  802. subject := replaceWithReplacer(c.Subject, replacer)
  803. startTime := time.Now()
  804. var files []mail.File
  805. if len(c.Attachments) > 0 {
  806. user, err := params.getUserFromSender()
  807. if err != nil {
  808. return err
  809. }
  810. files, err = getMailAttachments(user, c.Attachments, replacer)
  811. if err != nil {
  812. return err
  813. }
  814. }
  815. err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain, files...)
  816. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  817. time.Since(startTime), err)
  818. if err != nil {
  819. return fmt.Errorf("unable to send email: %w", err)
  820. }
  821. return nil
  822. }
  823. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  824. err := user.LoadAndApplyGroupSettings()
  825. if err != nil {
  826. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  827. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  828. }
  829. user.Filters.DisableFsChecks = false
  830. user.Filters.FilePatterns = nil
  831. for k := range user.Permissions {
  832. user.Permissions[k] = []string{dataprovider.PermAny}
  833. }
  834. return user, nil
  835. }
  836. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  837. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  838. if err != nil {
  839. return err
  840. }
  841. return conn.RemoveFile(fs, fsPath, item, info)
  842. }
  843. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  844. user, err := getUserForEventAction(user)
  845. if err != nil {
  846. return err
  847. }
  848. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  849. err = user.CheckFsRoot(connectionID)
  850. defer user.CloseFs() //nolint:errcheck
  851. if err != nil {
  852. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  853. }
  854. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  855. for _, item := range deletes {
  856. item = util.CleanPath(replaceWithReplacer(item, replacer))
  857. info, err := conn.DoStat(item, 0, false)
  858. if err != nil {
  859. if conn.IsNotExistError(err) {
  860. continue
  861. }
  862. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  863. }
  864. if info.IsDir() {
  865. if err = conn.RemoveDir(item); err != nil {
  866. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  867. }
  868. } else {
  869. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  870. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  871. }
  872. }
  873. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  874. }
  875. return nil
  876. }
  877. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  878. conditions dataprovider.ConditionOptions, params *EventParams,
  879. ) error {
  880. users, err := params.getUsers()
  881. if err != nil {
  882. return fmt.Errorf("unable to get users: %w", err)
  883. }
  884. var failures []string
  885. executed := 0
  886. for _, user := range users {
  887. // if sender is set, the conditions have already been evaluated
  888. if params.sender == "" {
  889. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  890. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, name conditions don't match",
  891. user.Username)
  892. continue
  893. }
  894. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  895. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, group name conditions don't match",
  896. user.Username)
  897. continue
  898. }
  899. }
  900. executed++
  901. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  902. params.AddError(err)
  903. failures = append(failures, user.Username)
  904. continue
  905. }
  906. }
  907. if len(failures) > 0 {
  908. return fmt.Errorf("fs delete failed for users: %+v", failures)
  909. }
  910. if executed == 0 {
  911. eventManagerLog(logger.LevelError, "no delete executed")
  912. return errors.New("no delete executed")
  913. }
  914. return nil
  915. }
  916. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  917. user, err := getUserForEventAction(user)
  918. if err != nil {
  919. return err
  920. }
  921. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  922. err = user.CheckFsRoot(connectionID)
  923. defer user.CloseFs() //nolint:errcheck
  924. if err != nil {
  925. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  926. }
  927. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  928. for _, item := range dirs {
  929. item = util.CleanPath(replaceWithReplacer(item, replacer))
  930. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  931. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  932. }
  933. if err = conn.createDirIfMissing(item); err != nil {
  934. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  935. }
  936. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  937. }
  938. return nil
  939. }
  940. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  941. conditions dataprovider.ConditionOptions, params *EventParams,
  942. ) error {
  943. users, err := params.getUsers()
  944. if err != nil {
  945. return fmt.Errorf("unable to get users: %w", err)
  946. }
  947. var failures []string
  948. executed := 0
  949. for _, user := range users {
  950. // if sender is set, the conditions have already been evaluated
  951. if params.sender == "" {
  952. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  953. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, name conditions don't match",
  954. user.Username)
  955. continue
  956. }
  957. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  958. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, group name conditions don't match",
  959. user.Username)
  960. continue
  961. }
  962. }
  963. executed++
  964. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  965. failures = append(failures, user.Username)
  966. continue
  967. }
  968. }
  969. if len(failures) > 0 {
  970. return fmt.Errorf("fs mkdir failed for users: %+v", failures)
  971. }
  972. if executed == 0 {
  973. eventManagerLog(logger.LevelError, "no mkdir executed")
  974. return errors.New("no mkdir executed")
  975. }
  976. return nil
  977. }
  978. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  979. user dataprovider.User,
  980. ) error {
  981. user, err := getUserForEventAction(user)
  982. if err != nil {
  983. return err
  984. }
  985. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  986. err = user.CheckFsRoot(connectionID)
  987. defer user.CloseFs() //nolint:errcheck
  988. if err != nil {
  989. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  990. }
  991. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  992. for _, item := range renames {
  993. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  994. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  995. if err = conn.Rename(source, target); err != nil {
  996. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  997. }
  998. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  999. }
  1000. return nil
  1001. }
  1002. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1003. user dataprovider.User,
  1004. ) error {
  1005. user, err := getUserForEventAction(user)
  1006. if err != nil {
  1007. return err
  1008. }
  1009. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1010. err = user.CheckFsRoot(connectionID)
  1011. defer user.CloseFs() //nolint:errcheck
  1012. if err != nil {
  1013. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1014. }
  1015. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1016. for _, item := range exist {
  1017. item = util.CleanPath(replaceWithReplacer(item, replacer))
  1018. if _, err = conn.DoStat(item, 0, false); err != nil {
  1019. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1020. }
  1021. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1022. }
  1023. return nil
  1024. }
  1025. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1026. conditions dataprovider.ConditionOptions, params *EventParams,
  1027. ) error {
  1028. users, err := params.getUsers()
  1029. if err != nil {
  1030. return fmt.Errorf("unable to get users: %w", err)
  1031. }
  1032. var failures []string
  1033. executed := 0
  1034. for _, user := range users {
  1035. // if sender is set, the conditions have already been evaluated
  1036. if params.sender == "" {
  1037. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1038. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, name conditions don't match",
  1039. user.Username)
  1040. continue
  1041. }
  1042. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1043. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, group name conditions don't match",
  1044. user.Username)
  1045. continue
  1046. }
  1047. }
  1048. executed++
  1049. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1050. failures = append(failures, user.Username)
  1051. params.AddError(err)
  1052. continue
  1053. }
  1054. }
  1055. if len(failures) > 0 {
  1056. return fmt.Errorf("fs rename failed for users: %+v", failures)
  1057. }
  1058. if executed == 0 {
  1059. eventManagerLog(logger.LevelError, "no rename executed")
  1060. return errors.New("no rename executed")
  1061. }
  1062. return nil
  1063. }
  1064. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1065. params *EventParams,
  1066. ) error {
  1067. users, err := params.getUsers()
  1068. if err != nil {
  1069. return fmt.Errorf("unable to get users: %w", err)
  1070. }
  1071. var failures []string
  1072. executed := 0
  1073. for _, user := range users {
  1074. // if sender is set, the conditions have already been evaluated
  1075. if params.sender == "" {
  1076. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1077. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, name conditions don't match",
  1078. user.Username)
  1079. continue
  1080. }
  1081. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1082. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, group name conditions don't match",
  1083. user.Username)
  1084. continue
  1085. }
  1086. }
  1087. executed++
  1088. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1089. failures = append(failures, user.Username)
  1090. params.AddError(err)
  1091. continue
  1092. }
  1093. }
  1094. if len(failures) > 0 {
  1095. return fmt.Errorf("fs existence check failed for users: %+v", failures)
  1096. }
  1097. if executed == 0 {
  1098. eventManagerLog(logger.LevelError, "no existence check executed")
  1099. return errors.New("no existence check executed")
  1100. }
  1101. return nil
  1102. }
  1103. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1104. params *EventParams,
  1105. ) error {
  1106. addObjectData := false
  1107. replacements := params.getStringReplacements(addObjectData)
  1108. replacer := strings.NewReplacer(replacements...)
  1109. switch c.Type {
  1110. case dataprovider.FilesystemActionRename:
  1111. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1112. case dataprovider.FilesystemActionDelete:
  1113. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1114. case dataprovider.FilesystemActionMkdirs:
  1115. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1116. case dataprovider.FilesystemActionExist:
  1117. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1118. default:
  1119. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1120. }
  1121. }
  1122. func executeQuotaResetForUser(user dataprovider.User) error {
  1123. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1124. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1125. user.Username, err)
  1126. return err
  1127. }
  1128. if !QuotaScans.AddUserQuotaScan(user.Username) {
  1129. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1130. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1131. }
  1132. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1133. numFiles, size, err := user.ScanQuota()
  1134. if err != nil {
  1135. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1136. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1137. }
  1138. err = dataprovider.UpdateUserQuota(&user, numFiles, size, true)
  1139. if err != nil {
  1140. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1141. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1142. }
  1143. return nil
  1144. }
  1145. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1146. users, err := params.getUsers()
  1147. if err != nil {
  1148. return fmt.Errorf("unable to get users: %w", err)
  1149. }
  1150. var failedResets []string
  1151. executed := 0
  1152. for _, user := range users {
  1153. // if sender is set, the conditions have already been evaluated
  1154. if params.sender == "" {
  1155. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1156. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, name conditions don't match",
  1157. user.Username)
  1158. continue
  1159. }
  1160. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1161. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, group name conditions don't match",
  1162. user.Username)
  1163. continue
  1164. }
  1165. }
  1166. executed++
  1167. if err = executeQuotaResetForUser(user); err != nil {
  1168. params.AddError(err)
  1169. failedResets = append(failedResets, user.Username)
  1170. continue
  1171. }
  1172. }
  1173. if len(failedResets) > 0 {
  1174. return fmt.Errorf("quota reset failed for users: %+v", failedResets)
  1175. }
  1176. if executed == 0 {
  1177. eventManagerLog(logger.LevelError, "no user quota reset executed")
  1178. return errors.New("no user quota reset executed")
  1179. }
  1180. return nil
  1181. }
  1182. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1183. folders, err := params.getFolders()
  1184. if err != nil {
  1185. return fmt.Errorf("unable to get folders: %w", err)
  1186. }
  1187. var failedResets []string
  1188. executed := 0
  1189. for _, folder := range folders {
  1190. // if sender is set, the conditions have already been evaluated
  1191. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  1192. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  1193. folder.Name)
  1194. continue
  1195. }
  1196. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  1197. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  1198. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  1199. failedResets = append(failedResets, folder.Name)
  1200. continue
  1201. }
  1202. executed++
  1203. f := vfs.VirtualFolder{
  1204. BaseVirtualFolder: folder,
  1205. VirtualPath: "/",
  1206. }
  1207. numFiles, size, err := f.ScanQuota()
  1208. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  1209. if err != nil {
  1210. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  1211. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  1212. failedResets = append(failedResets, folder.Name)
  1213. continue
  1214. }
  1215. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  1216. if err != nil {
  1217. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  1218. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  1219. failedResets = append(failedResets, folder.Name)
  1220. }
  1221. }
  1222. if len(failedResets) > 0 {
  1223. return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
  1224. }
  1225. if executed == 0 {
  1226. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  1227. return errors.New("no folder quota reset executed")
  1228. }
  1229. return nil
  1230. }
  1231. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1232. users, err := params.getUsers()
  1233. if err != nil {
  1234. return fmt.Errorf("unable to get users: %w", err)
  1235. }
  1236. var failedResets []string
  1237. executed := 0
  1238. for _, user := range users {
  1239. // if sender is set, the conditions have already been evaluated
  1240. if params.sender == "" {
  1241. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1242. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, name conditions don't match",
  1243. user.Username)
  1244. continue
  1245. }
  1246. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1247. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, group name conditions don't match",
  1248. user.Username)
  1249. continue
  1250. }
  1251. }
  1252. executed++
  1253. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  1254. if err != nil {
  1255. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  1256. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  1257. failedResets = append(failedResets, user.Username)
  1258. }
  1259. }
  1260. if len(failedResets) > 0 {
  1261. return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
  1262. }
  1263. if executed == 0 {
  1264. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  1265. return errors.New("no transfer quota reset executed")
  1266. }
  1267. return nil
  1268. }
  1269. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention) error {
  1270. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1271. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  1272. user.Username, err)
  1273. return err
  1274. }
  1275. check := RetentionCheck{
  1276. Folders: folders,
  1277. }
  1278. c := RetentionChecks.Add(check, &user)
  1279. if c == nil {
  1280. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  1281. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  1282. }
  1283. if err := c.Start(); err != nil {
  1284. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  1285. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  1286. }
  1287. return nil
  1288. }
  1289. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  1290. conditions dataprovider.ConditionOptions, params *EventParams,
  1291. ) error {
  1292. users, err := params.getUsers()
  1293. if err != nil {
  1294. return fmt.Errorf("unable to get users: %w", err)
  1295. }
  1296. var failedChecks []string
  1297. executed := 0
  1298. for _, user := range users {
  1299. // if sender is set, the conditions have already been evaluated
  1300. if params.sender == "" {
  1301. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1302. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, name conditions don't match",
  1303. user.Username)
  1304. continue
  1305. }
  1306. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1307. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, group name conditions don't match",
  1308. user.Username)
  1309. continue
  1310. }
  1311. }
  1312. executed++
  1313. if err = executeDataRetentionCheckForUser(user, config.Folders); err != nil {
  1314. failedChecks = append(failedChecks, user.Username)
  1315. params.AddError(err)
  1316. continue
  1317. }
  1318. }
  1319. if len(failedChecks) > 0 {
  1320. return fmt.Errorf("retention check failed for users: %+v", failedChecks)
  1321. }
  1322. if executed == 0 {
  1323. eventManagerLog(logger.LevelError, "no retention check executed")
  1324. return errors.New("no retention check executed")
  1325. }
  1326. return nil
  1327. }
  1328. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams, conditions dataprovider.ConditionOptions) error {
  1329. var err error
  1330. switch action.Type {
  1331. case dataprovider.ActionTypeHTTP:
  1332. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  1333. case dataprovider.ActionTypeCommand:
  1334. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  1335. case dataprovider.ActionTypeEmail:
  1336. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  1337. case dataprovider.ActionTypeBackup:
  1338. err = dataprovider.ExecuteBackup()
  1339. case dataprovider.ActionTypeUserQuotaReset:
  1340. err = executeUsersQuotaResetRuleAction(conditions, params)
  1341. case dataprovider.ActionTypeFolderQuotaReset:
  1342. err = executeFoldersQuotaResetRuleAction(conditions, params)
  1343. case dataprovider.ActionTypeTransferQuotaReset:
  1344. err = executeTransferQuotaResetRuleAction(conditions, params)
  1345. case dataprovider.ActionTypeDataRetentionCheck:
  1346. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params)
  1347. case dataprovider.ActionTypeFilesystem:
  1348. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  1349. default:
  1350. err = fmt.Errorf("unsupported action type: %d", action.Type)
  1351. }
  1352. if err != nil {
  1353. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  1354. }
  1355. params.AddError(err)
  1356. return err
  1357. }
  1358. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  1359. var errRes error
  1360. for _, rule := range rules {
  1361. var failedActions []string
  1362. paramsCopy := params.getACopy()
  1363. for _, action := range rule.Actions {
  1364. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  1365. startTime := time.Now()
  1366. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  1367. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  1368. action.Name, rule.Name, time.Since(startTime), err)
  1369. failedActions = append(failedActions, action.Name)
  1370. // we return the last error, it is ok for now
  1371. errRes = err
  1372. if action.Options.StopOnFailure {
  1373. break
  1374. }
  1375. } else {
  1376. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  1377. action.Name, rule.Name, time.Since(startTime))
  1378. }
  1379. }
  1380. }
  1381. // execute async actions if any, including failure actions
  1382. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  1383. }
  1384. return errRes
  1385. }
  1386. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  1387. eventManager.addAsyncTask()
  1388. defer eventManager.removeAsyncTask()
  1389. for _, rule := range rules {
  1390. executeRuleAsyncActions(rule, params.getACopy(), nil)
  1391. }
  1392. }
  1393. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  1394. for _, action := range rule.Actions {
  1395. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  1396. startTime := time.Now()
  1397. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  1398. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  1399. action.Name, rule.Name, time.Since(startTime), err)
  1400. failedActions = append(failedActions, action.Name)
  1401. if action.Options.StopOnFailure {
  1402. break
  1403. }
  1404. } else {
  1405. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  1406. action.Name, rule.Name, time.Since(startTime))
  1407. }
  1408. }
  1409. }
  1410. if len(failedActions) > 0 {
  1411. params.updateStatusFromError = false
  1412. // execute failure actions
  1413. for _, action := range rule.Actions {
  1414. if action.Options.IsFailureAction {
  1415. startTime := time.Now()
  1416. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  1417. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  1418. action.Name, rule.Name, time.Since(startTime), err)
  1419. if action.Options.StopOnFailure {
  1420. break
  1421. }
  1422. } else {
  1423. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  1424. action.Name, rule.Name, time.Since(startTime))
  1425. }
  1426. }
  1427. }
  1428. }
  1429. }
  1430. type eventCronJob struct {
  1431. ruleName string
  1432. }
  1433. func (j *eventCronJob) getTask(rule dataprovider.EventRule) (dataprovider.Task, error) {
  1434. if rule.GuardFromConcurrentExecution() {
  1435. task, err := dataprovider.GetTaskByName(rule.Name)
  1436. if _, ok := err.(*util.RecordNotFoundError); ok {
  1437. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  1438. task = dataprovider.Task{
  1439. Name: rule.Name,
  1440. UpdateAt: 0,
  1441. Version: 0,
  1442. }
  1443. err = dataprovider.AddTask(rule.Name)
  1444. if err != nil {
  1445. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  1446. return task, err
  1447. }
  1448. } else {
  1449. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  1450. }
  1451. return task, err
  1452. }
  1453. return dataprovider.Task{}, nil
  1454. }
  1455. func (j *eventCronJob) Run() {
  1456. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  1457. rule, err := dataprovider.EventRuleExists(j.ruleName)
  1458. if err != nil {
  1459. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  1460. return
  1461. }
  1462. if err = rule.CheckActionsConsistency(""); err != nil {
  1463. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  1464. return
  1465. }
  1466. task, err := j.getTask(rule)
  1467. if err != nil {
  1468. return
  1469. }
  1470. if task.Name != "" {
  1471. updateInterval := 5 * time.Minute
  1472. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  1473. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  1474. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  1475. return
  1476. }
  1477. err = dataprovider.UpdateTask(rule.Name, task.Version)
  1478. if err != nil {
  1479. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  1480. rule.Name, err)
  1481. return
  1482. }
  1483. ticker := time.NewTicker(updateInterval)
  1484. done := make(chan bool)
  1485. defer func() {
  1486. done <- true
  1487. ticker.Stop()
  1488. }()
  1489. go func(taskName string) {
  1490. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  1491. for {
  1492. select {
  1493. case <-done:
  1494. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  1495. return
  1496. case <-ticker.C:
  1497. err := dataprovider.UpdateTaskTimestamp(taskName)
  1498. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  1499. }
  1500. }
  1501. }(task.Name)
  1502. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  1503. } else {
  1504. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  1505. }
  1506. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  1507. }
  1508. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  1509. logger.Log(level, "eventmanager", "", format, v...)
  1510. }