eventmanager.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "os/exec"
  25. "path"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "github.com/robfig/cron/v3"
  31. "github.com/rs/xid"
  32. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  33. "github.com/drakkan/sftpgo/v2/internal/logger"
  34. "github.com/drakkan/sftpgo/v2/internal/plugin"
  35. "github.com/drakkan/sftpgo/v2/internal/smtp"
  36. "github.com/drakkan/sftpgo/v2/internal/util"
  37. "github.com/drakkan/sftpgo/v2/internal/vfs"
  38. )
  39. const (
  40. ipBlockedEventName = "IP Blocked"
  41. )
  42. var (
  43. // eventManager handle the supported event rules actions
  44. eventManager eventRulesContainer
  45. )
  46. func init() {
  47. eventManager = eventRulesContainer{
  48. schedulesMapping: make(map[string][]cron.EntryID),
  49. // arbitrary maximum number of concurrent asynchronous tasks,
  50. // each task could execute multiple actions
  51. concurrencyGuard: make(chan struct{}, 200),
  52. }
  53. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  54. func(operation, executor, ip, objectType, objectName string, object plugin.Renderer) {
  55. eventManager.handleProviderEvent(EventParams{
  56. Name: executor,
  57. ObjectName: objectName,
  58. Event: operation,
  59. Status: 1,
  60. ObjectType: objectType,
  61. IP: ip,
  62. Timestamp: time.Now().UnixNano(),
  63. Object: object,
  64. })
  65. })
  66. }
  67. // eventRulesContainer stores event rules by trigger
  68. type eventRulesContainer struct {
  69. sync.RWMutex
  70. lastLoad int64
  71. FsEvents []dataprovider.EventRule
  72. ProviderEvents []dataprovider.EventRule
  73. Schedules []dataprovider.EventRule
  74. IPBlockedEvents []dataprovider.EventRule
  75. schedulesMapping map[string][]cron.EntryID
  76. concurrencyGuard chan struct{}
  77. }
  78. func (r *eventRulesContainer) addAsyncTask() {
  79. r.concurrencyGuard <- struct{}{}
  80. }
  81. func (r *eventRulesContainer) removeAsyncTask() {
  82. <-r.concurrencyGuard
  83. }
  84. func (r *eventRulesContainer) getLastLoadTime() int64 {
  85. return atomic.LoadInt64(&r.lastLoad)
  86. }
  87. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  88. atomic.StoreInt64(&r.lastLoad, modTime)
  89. }
  90. // RemoveRule deletes the rule with the specified name
  91. func (r *eventRulesContainer) RemoveRule(name string) {
  92. r.Lock()
  93. defer r.Unlock()
  94. r.removeRuleInternal(name)
  95. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  96. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  97. }
  98. func (r *eventRulesContainer) removeRuleInternal(name string) {
  99. for idx := range r.FsEvents {
  100. if r.FsEvents[idx].Name == name {
  101. lastIdx := len(r.FsEvents) - 1
  102. r.FsEvents[idx] = r.FsEvents[lastIdx]
  103. r.FsEvents = r.FsEvents[:lastIdx]
  104. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  105. return
  106. }
  107. }
  108. for idx := range r.ProviderEvents {
  109. if r.ProviderEvents[idx].Name == name {
  110. lastIdx := len(r.ProviderEvents) - 1
  111. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  112. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  113. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  114. return
  115. }
  116. }
  117. for idx := range r.IPBlockedEvents {
  118. if r.IPBlockedEvents[idx].Name == name {
  119. lastIdx := len(r.IPBlockedEvents) - 1
  120. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  121. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  122. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  123. return
  124. }
  125. }
  126. for idx := range r.Schedules {
  127. if r.Schedules[idx].Name == name {
  128. if schedules, ok := r.schedulesMapping[name]; ok {
  129. for _, entryID := range schedules {
  130. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  131. eventScheduler.Remove(entryID)
  132. }
  133. delete(r.schedulesMapping, name)
  134. }
  135. lastIdx := len(r.Schedules) - 1
  136. r.Schedules[idx] = r.Schedules[lastIdx]
  137. r.Schedules = r.Schedules[:lastIdx]
  138. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  139. return
  140. }
  141. }
  142. }
  143. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  144. r.removeRuleInternal(rule.Name)
  145. if rule.DeletedAt > 0 {
  146. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  147. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  148. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  149. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  150. }
  151. return
  152. }
  153. switch rule.Trigger {
  154. case dataprovider.EventTriggerFsEvent:
  155. r.FsEvents = append(r.FsEvents, rule)
  156. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  157. case dataprovider.EventTriggerProviderEvent:
  158. r.ProviderEvents = append(r.ProviderEvents, rule)
  159. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  160. case dataprovider.EventTriggerIPBlocked:
  161. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  162. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  163. case dataprovider.EventTriggerSchedule:
  164. for _, schedule := range rule.Conditions.Schedules {
  165. cronSpec := schedule.GetCronSpec()
  166. job := &eventCronJob{
  167. ruleName: dataprovider.ConvertName(rule.Name),
  168. }
  169. entryID, err := eventScheduler.AddJob(cronSpec, job)
  170. if err != nil {
  171. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  172. return
  173. }
  174. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  175. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  176. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  177. }
  178. r.Schedules = append(r.Schedules, rule)
  179. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  180. default:
  181. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  182. }
  183. }
  184. func (r *eventRulesContainer) loadRules() {
  185. eventManagerLog(logger.LevelDebug, "loading updated rules")
  186. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  187. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  188. if err != nil {
  189. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  190. return
  191. }
  192. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  193. if len(rules) > 0 {
  194. r.Lock()
  195. defer r.Unlock()
  196. for _, rule := range rules {
  197. r.addUpdateRuleInternal(rule)
  198. }
  199. }
  200. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d",
  201. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents))
  202. r.setLastLoadTime(modTime)
  203. }
  204. func (r *eventRulesContainer) checkProviderEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  205. if !util.Contains(conditions.ProviderEvents, params.Event) {
  206. return false
  207. }
  208. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  209. return false
  210. }
  211. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  212. return false
  213. }
  214. return true
  215. }
  216. func (r *eventRulesContainer) checkFsEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  217. if !util.Contains(conditions.FsEvents, params.Event) {
  218. return false
  219. }
  220. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  221. return false
  222. }
  223. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  224. if !checkEventConditionPatterns(params.ObjectName, conditions.Options.FsPaths) {
  225. return false
  226. }
  227. }
  228. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  229. return false
  230. }
  231. if params.Event == operationUpload || params.Event == operationDownload {
  232. if conditions.Options.MinFileSize > 0 {
  233. if params.FileSize < conditions.Options.MinFileSize {
  234. return false
  235. }
  236. }
  237. if conditions.Options.MaxFileSize > 0 {
  238. if params.FileSize > conditions.Options.MaxFileSize {
  239. return false
  240. }
  241. }
  242. }
  243. return true
  244. }
  245. // hasFsRules returns true if there are any rules for filesystem event triggers
  246. func (r *eventRulesContainer) hasFsRules() bool {
  247. r.RLock()
  248. defer r.RUnlock()
  249. return len(r.FsEvents) > 0
  250. }
  251. // handleFsEvent executes the rules actions defined for the specified event
  252. func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
  253. if params.Protocol == protocolEventAction {
  254. return nil
  255. }
  256. r.RLock()
  257. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  258. for _, rule := range r.FsEvents {
  259. if r.checkFsEventMatch(rule.Conditions, params) {
  260. if err := rule.CheckActionsConsistency(""); err != nil {
  261. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  262. rule.Name, err, params.Event)
  263. continue
  264. }
  265. hasSyncActions := false
  266. for _, action := range rule.Actions {
  267. if action.Options.ExecuteSync {
  268. hasSyncActions = true
  269. break
  270. }
  271. }
  272. if hasSyncActions {
  273. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  274. } else {
  275. rulesAsync = append(rulesAsync, rule)
  276. }
  277. }
  278. }
  279. r.RUnlock()
  280. params.sender = params.Name
  281. if len(rulesAsync) > 0 {
  282. go executeAsyncRulesActions(rulesAsync, params)
  283. }
  284. if len(rulesWithSyncActions) > 0 {
  285. return executeSyncRulesActions(rulesWithSyncActions, params)
  286. }
  287. return nil
  288. }
  289. // username is populated for user objects
  290. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  291. r.RLock()
  292. defer r.RUnlock()
  293. var rules []dataprovider.EventRule
  294. for _, rule := range r.ProviderEvents {
  295. if r.checkProviderEventMatch(rule.Conditions, params) {
  296. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  297. rules = append(rules, rule)
  298. } else {
  299. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  300. rule.Name, err, params.Event, params.ObjectType)
  301. }
  302. }
  303. }
  304. if len(rules) > 0 {
  305. params.sender = params.ObjectName
  306. go executeAsyncRulesActions(rules, params)
  307. }
  308. }
  309. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  310. r.RLock()
  311. defer r.RUnlock()
  312. if len(r.IPBlockedEvents) == 0 {
  313. return
  314. }
  315. var rules []dataprovider.EventRule
  316. for _, rule := range r.IPBlockedEvents {
  317. if err := rule.CheckActionsConsistency(""); err == nil {
  318. rules = append(rules, rule)
  319. } else {
  320. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  321. rule.Name, err, params.Event)
  322. }
  323. }
  324. if len(rules) > 0 {
  325. go executeAsyncRulesActions(rules, params)
  326. }
  327. }
  328. // EventParams defines the supported event parameters
  329. type EventParams struct {
  330. Name string
  331. Event string
  332. Status int
  333. VirtualPath string
  334. FsPath string
  335. VirtualTargetPath string
  336. FsTargetPath string
  337. ObjectName string
  338. ObjectType string
  339. FileSize int64
  340. Protocol string
  341. IP string
  342. Timestamp int64
  343. Object plugin.Renderer
  344. sender string
  345. }
  346. // getUsers returns users with group settings not applied
  347. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  348. if p.sender == "" {
  349. return dataprovider.DumpUsers()
  350. }
  351. user, err := dataprovider.UserExists(p.sender)
  352. if err != nil {
  353. return nil, fmt.Errorf("error getting user %q: %w", p.sender, err)
  354. }
  355. return []dataprovider.User{user}, nil
  356. }
  357. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  358. if p.sender == "" {
  359. return dataprovider.DumpFolders()
  360. }
  361. folder, err := dataprovider.GetFolderByName(p.sender)
  362. if err != nil {
  363. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  364. }
  365. return []vfs.BaseVirtualFolder{folder}, nil
  366. }
  367. func (p *EventParams) getStringReplacements(addObjectData bool) []string {
  368. replacements := []string{
  369. "{{Name}}", p.Name,
  370. "{{Event}}", p.Event,
  371. "{{Status}}", fmt.Sprintf("%d", p.Status),
  372. "{{VirtualPath}}", p.VirtualPath,
  373. "{{FsPath}}", p.FsPath,
  374. "{{VirtualTargetPath}}", p.VirtualTargetPath,
  375. "{{FsTargetPath}}", p.FsTargetPath,
  376. "{{ObjectName}}", p.ObjectName,
  377. "{{ObjectType}}", p.ObjectType,
  378. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  379. "{{Protocol}}", p.Protocol,
  380. "{{IP}}", p.IP,
  381. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  382. }
  383. if addObjectData {
  384. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  385. if err == nil {
  386. replacements = append(replacements, "{{ObjectData}}", string(data))
  387. }
  388. }
  389. return replacements
  390. }
  391. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  392. if !strings.Contains(input, "{{") {
  393. return input
  394. }
  395. return replacer.Replace(input)
  396. }
  397. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  398. matched, err := path.Match(p.Pattern, name)
  399. if err != nil {
  400. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  401. return false
  402. }
  403. if p.InverseMatch {
  404. return !matched
  405. }
  406. return matched
  407. }
  408. // checkConditionPatterns returns false if patterns are defined and no match is found
  409. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  410. if len(patterns) == 0 {
  411. return true
  412. }
  413. for _, p := range patterns {
  414. if checkEventConditionPattern(p, name) {
  415. return true
  416. }
  417. }
  418. return false
  419. }
  420. func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  421. if len(c.QueryParameters) > 0 {
  422. u, err := url.Parse(c.Endpoint)
  423. if err != nil {
  424. return "", fmt.Errorf("invalid endpoint: %w", err)
  425. }
  426. q := u.Query()
  427. for _, keyVal := range c.QueryParameters {
  428. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  429. }
  430. u.RawQuery = q.Encode()
  431. return u.String(), nil
  432. }
  433. return c.Endpoint, nil
  434. }
  435. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params EventParams) error {
  436. if !c.Password.IsEmpty() {
  437. if err := c.Password.TryDecrypt(); err != nil {
  438. return fmt.Errorf("unable to decrypt password: %w", err)
  439. }
  440. }
  441. addObjectData := false
  442. if params.Object != nil {
  443. if !addObjectData {
  444. if strings.Contains(c.Body, "{{ObjectData}}") {
  445. addObjectData = true
  446. }
  447. }
  448. }
  449. replacements := params.getStringReplacements(addObjectData)
  450. replacer := strings.NewReplacer(replacements...)
  451. endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
  452. if err != nil {
  453. return err
  454. }
  455. var body io.Reader
  456. if c.Body != "" && c.Method != http.MethodGet {
  457. body = bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))
  458. }
  459. req, err := http.NewRequest(c.Method, endpoint, body)
  460. if err != nil {
  461. return err
  462. }
  463. if c.Username != "" {
  464. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetAdditionalData())
  465. }
  466. for _, keyVal := range c.Headers {
  467. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  468. }
  469. client := c.GetHTTPClient()
  470. defer client.CloseIdleConnections()
  471. startTime := time.Now()
  472. resp, err := client.Do(req)
  473. if err != nil {
  474. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  475. endpoint, time.Since(startTime), err)
  476. return err
  477. }
  478. defer resp.Body.Close()
  479. eventManagerLog(logger.LevelDebug, "http notification sent, endopoint: %s, elapsed: %s, status code: %d",
  480. endpoint, time.Since(startTime), resp.StatusCode)
  481. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  482. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  483. }
  484. return nil
  485. }
  486. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params EventParams) error {
  487. envVars := make([]string, 0, len(c.EnvVars))
  488. addObjectData := false
  489. if params.Object != nil {
  490. for _, k := range c.EnvVars {
  491. if strings.Contains(k.Value, "{{ObjectData}}") {
  492. addObjectData = true
  493. break
  494. }
  495. }
  496. }
  497. replacements := params.getStringReplacements(addObjectData)
  498. replacer := strings.NewReplacer(replacements...)
  499. for _, keyVal := range c.EnvVars {
  500. envVars = append(envVars, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  501. }
  502. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  503. defer cancel()
  504. cmd := exec.CommandContext(ctx, c.Cmd)
  505. cmd.Env = append(cmd.Env, os.Environ()...)
  506. cmd.Env = append(cmd.Env, envVars...)
  507. startTime := time.Now()
  508. err := cmd.Run()
  509. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  510. c.Cmd, time.Since(startTime), err)
  511. return err
  512. }
  513. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params EventParams) error {
  514. addObjectData := false
  515. if params.Object != nil {
  516. if strings.Contains(c.Body, "{{ObjectData}}") {
  517. addObjectData = true
  518. }
  519. }
  520. replacements := params.getStringReplacements(addObjectData)
  521. replacer := strings.NewReplacer(replacements...)
  522. body := replaceWithReplacer(c.Body, replacer)
  523. subject := replaceWithReplacer(c.Subject, replacer)
  524. startTime := time.Now()
  525. err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain)
  526. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  527. time.Since(startTime), err)
  528. return err
  529. }
  530. func getUserForEventAction(username string) (dataprovider.User, error) {
  531. user, err := dataprovider.GetUserWithGroupSettings(username)
  532. if err != nil {
  533. return dataprovider.User{}, err
  534. }
  535. user.Filters.DisableFsChecks = false
  536. user.Filters.FilePatterns = nil
  537. for k := range user.Permissions {
  538. user.Permissions[k] = []string{dataprovider.PermAny}
  539. }
  540. return user, err
  541. }
  542. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  543. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  544. if err != nil {
  545. return err
  546. }
  547. return conn.RemoveFile(fs, fsPath, item, info)
  548. }
  549. func executeDeleteFsAction(deletes []string, replacer *strings.Replacer, username string) error {
  550. user, err := getUserForEventAction(username)
  551. if err != nil {
  552. return err
  553. }
  554. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  555. err = user.CheckFsRoot(connectionID)
  556. defer user.CloseFs() //nolint:errcheck
  557. if err != nil {
  558. return err
  559. }
  560. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  561. for _, item := range deletes {
  562. item = replaceWithReplacer(item, replacer)
  563. info, err := conn.DoStat(item, 0, false)
  564. if err != nil {
  565. if conn.IsNotExistError(err) {
  566. continue
  567. }
  568. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  569. }
  570. if info.IsDir() {
  571. if err = conn.RemoveDir(item); err != nil {
  572. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  573. }
  574. } else {
  575. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  576. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  577. }
  578. }
  579. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  580. }
  581. return nil
  582. }
  583. func executeMkDirsFsAction(dirs []string, replacer *strings.Replacer, username string) error {
  584. user, err := getUserForEventAction(username)
  585. if err != nil {
  586. return err
  587. }
  588. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  589. err = user.CheckFsRoot(connectionID)
  590. defer user.CloseFs() //nolint:errcheck
  591. if err != nil {
  592. return err
  593. }
  594. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  595. for _, item := range dirs {
  596. item = replaceWithReplacer(item, replacer)
  597. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  598. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  599. }
  600. if err = conn.createDirIfMissing(item); err != nil {
  601. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  602. }
  603. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  604. }
  605. return nil
  606. }
  607. func executeRenameFsAction(renames []dataprovider.KeyValue, replacer *strings.Replacer, username string) error {
  608. user, err := getUserForEventAction(username)
  609. if err != nil {
  610. return err
  611. }
  612. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  613. err = user.CheckFsRoot(connectionID)
  614. defer user.CloseFs() //nolint:errcheck
  615. if err != nil {
  616. return err
  617. }
  618. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  619. for _, item := range renames {
  620. source := replaceWithReplacer(item.Key, replacer)
  621. target := replaceWithReplacer(item.Value, replacer)
  622. if err = conn.Rename(source, target); err != nil {
  623. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  624. }
  625. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  626. }
  627. return nil
  628. }
  629. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, params EventParams) error {
  630. addObjectData := false
  631. replacements := params.getStringReplacements(addObjectData)
  632. replacer := strings.NewReplacer(replacements...)
  633. switch c.Type {
  634. case dataprovider.FilesystemActionRename:
  635. return executeRenameFsAction(c.Renames, replacer, params.sender)
  636. case dataprovider.FilesystemActionDelete:
  637. return executeDeleteFsAction(c.Deletes, replacer, params.sender)
  638. case dataprovider.FilesystemActionMkdirs:
  639. return executeMkDirsFsAction(c.MkDirs, replacer, params.sender)
  640. default:
  641. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  642. }
  643. }
  644. func executeQuotaResetForUser(user dataprovider.User) error {
  645. if err := user.LoadAndApplyGroupSettings(); err != nil {
  646. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  647. user.Username, err)
  648. return err
  649. }
  650. if !QuotaScans.AddUserQuotaScan(user.Username) {
  651. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %s", user.Username)
  652. return fmt.Errorf("another quota scan is in progress for user %s", user.Username)
  653. }
  654. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  655. numFiles, size, err := user.ScanQuota()
  656. if err != nil {
  657. eventManagerLog(logger.LevelError, "error scanning quota for user %s: %v", user.Username, err)
  658. return err
  659. }
  660. err = dataprovider.UpdateUserQuota(&user, numFiles, size, true)
  661. if err != nil {
  662. eventManagerLog(logger.LevelError, "error updating quota for user %s: %v", user.Username, err)
  663. return err
  664. }
  665. return nil
  666. }
  667. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
  668. users, err := params.getUsers()
  669. if err != nil {
  670. return fmt.Errorf("unable to get users: %w", err)
  671. }
  672. var failedResets []string
  673. executed := 0
  674. for _, user := range users {
  675. // if sender is set, the conditions have already been evaluated
  676. if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
  677. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, name conditions don't match",
  678. user.Username)
  679. continue
  680. }
  681. executed++
  682. if err = executeQuotaResetForUser(user); err != nil {
  683. failedResets = append(failedResets, user.Username)
  684. continue
  685. }
  686. }
  687. if len(failedResets) > 0 {
  688. return fmt.Errorf("quota reset failed for users: %+v", failedResets)
  689. }
  690. if executed == 0 {
  691. eventManagerLog(logger.LevelError, "no user quota reset executed")
  692. return errors.New("no user quota reset executed")
  693. }
  694. return nil
  695. }
  696. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
  697. folders, err := params.getFolders()
  698. if err != nil {
  699. return fmt.Errorf("unable to get folders: %w", err)
  700. }
  701. var failedResets []string
  702. executed := 0
  703. for _, folder := range folders {
  704. // if sender is set, the conditions have already been evaluated
  705. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  706. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  707. folder.Name)
  708. continue
  709. }
  710. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  711. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %s", folder.Name)
  712. failedResets = append(failedResets, folder.Name)
  713. continue
  714. }
  715. executed++
  716. f := vfs.VirtualFolder{
  717. BaseVirtualFolder: folder,
  718. VirtualPath: "/",
  719. }
  720. numFiles, size, err := f.ScanQuota()
  721. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  722. if err != nil {
  723. eventManagerLog(logger.LevelError, "error scanning quota for folder %s: %v", folder.Name, err)
  724. failedResets = append(failedResets, folder.Name)
  725. continue
  726. }
  727. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  728. if err != nil {
  729. eventManagerLog(logger.LevelError, "error updating quota for folder %s: %v", folder.Name, err)
  730. failedResets = append(failedResets, folder.Name)
  731. }
  732. }
  733. if len(failedResets) > 0 {
  734. return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
  735. }
  736. if executed == 0 {
  737. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  738. return errors.New("no folder quota reset executed")
  739. }
  740. return nil
  741. }
  742. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
  743. users, err := params.getUsers()
  744. if err != nil {
  745. return fmt.Errorf("unable to get users: %w", err)
  746. }
  747. var failedResets []string
  748. executed := 0
  749. for _, user := range users {
  750. // if sender is set, the conditions have already been evaluated
  751. if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
  752. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, name conditions don't match",
  753. user.Username)
  754. continue
  755. }
  756. executed++
  757. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  758. if err != nil {
  759. eventManagerLog(logger.LevelError, "error updating transfer quota for user %s: %v", user.Username, err)
  760. failedResets = append(failedResets, user.Username)
  761. }
  762. }
  763. if len(failedResets) > 0 {
  764. return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
  765. }
  766. if executed == 0 {
  767. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  768. return errors.New("no transfer quota reset executed")
  769. }
  770. return nil
  771. }
  772. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention) error {
  773. if err := user.LoadAndApplyGroupSettings(); err != nil {
  774. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  775. user.Username, err)
  776. return err
  777. }
  778. check := RetentionCheck{
  779. Folders: folders,
  780. }
  781. c := RetentionChecks.Add(check, &user)
  782. if c == nil {
  783. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %s", user.Username)
  784. return fmt.Errorf("another retention check is in progress for user %s", user.Username)
  785. }
  786. if err := c.Start(); err != nil {
  787. eventManagerLog(logger.LevelError, "error checking retention for user %s: %v", user.Username, err)
  788. return err
  789. }
  790. return nil
  791. }
  792. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  793. conditions dataprovider.ConditionOptions, params EventParams,
  794. ) error {
  795. users, err := params.getUsers()
  796. if err != nil {
  797. return fmt.Errorf("unable to get users: %w", err)
  798. }
  799. var failedChecks []string
  800. executed := 0
  801. for _, user := range users {
  802. // if sender is set, the conditions have already been evaluated
  803. if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
  804. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, name conditions don't match",
  805. user.Username)
  806. continue
  807. }
  808. executed++
  809. if err = executeDataRetentionCheckForUser(user, config.Folders); err != nil {
  810. failedChecks = append(failedChecks, user.Username)
  811. continue
  812. }
  813. }
  814. if len(failedChecks) > 0 {
  815. return fmt.Errorf("retention check failed for users: %+v", failedChecks)
  816. }
  817. if executed == 0 {
  818. eventManagerLog(logger.LevelError, "no retention check executed")
  819. return errors.New("no retention check executed")
  820. }
  821. return nil
  822. }
  823. func executeRuleAction(action dataprovider.BaseEventAction, params EventParams, conditions dataprovider.ConditionOptions) error {
  824. switch action.Type {
  825. case dataprovider.ActionTypeHTTP:
  826. return executeHTTPRuleAction(action.Options.HTTPConfig, params)
  827. case dataprovider.ActionTypeCommand:
  828. return executeCommandRuleAction(action.Options.CmdConfig, params)
  829. case dataprovider.ActionTypeEmail:
  830. return executeEmailRuleAction(action.Options.EmailConfig, params)
  831. case dataprovider.ActionTypeBackup:
  832. return dataprovider.ExecuteBackup()
  833. case dataprovider.ActionTypeUserQuotaReset:
  834. return executeUsersQuotaResetRuleAction(conditions, params)
  835. case dataprovider.ActionTypeFolderQuotaReset:
  836. return executeFoldersQuotaResetRuleAction(conditions, params)
  837. case dataprovider.ActionTypeTransferQuotaReset:
  838. return executeTransferQuotaResetRuleAction(conditions, params)
  839. case dataprovider.ActionTypeDataRetentionCheck:
  840. return executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params)
  841. case dataprovider.ActionTypeFilesystem:
  842. return executeFsRuleAction(action.Options.FsConfig, params)
  843. default:
  844. return fmt.Errorf("unsupported action type: %d", action.Type)
  845. }
  846. }
  847. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  848. var errRes error
  849. for _, rule := range rules {
  850. var failedActions []string
  851. for _, action := range rule.Actions {
  852. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  853. startTime := time.Now()
  854. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  855. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  856. action.Name, rule.Name, time.Since(startTime), err)
  857. failedActions = append(failedActions, action.Name)
  858. // we return the last error, it is ok for now
  859. errRes = err
  860. if action.Options.StopOnFailure {
  861. break
  862. }
  863. } else {
  864. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  865. action.Name, rule.Name, time.Since(startTime))
  866. }
  867. }
  868. }
  869. // execute async actions if any, including failure actions
  870. go executeRuleAsyncActions(rule, params, failedActions)
  871. }
  872. return errRes
  873. }
  874. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  875. eventManager.addAsyncTask()
  876. defer eventManager.removeAsyncTask()
  877. for _, rule := range rules {
  878. executeRuleAsyncActions(rule, params, nil)
  879. }
  880. }
  881. func executeRuleAsyncActions(rule dataprovider.EventRule, params EventParams, failedActions []string) {
  882. for _, action := range rule.Actions {
  883. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  884. startTime := time.Now()
  885. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  886. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  887. action.Name, rule.Name, time.Since(startTime), err)
  888. failedActions = append(failedActions, action.Name)
  889. if action.Options.StopOnFailure {
  890. break
  891. }
  892. } else {
  893. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  894. action.Name, rule.Name, time.Since(startTime))
  895. }
  896. }
  897. }
  898. if len(failedActions) > 0 {
  899. // execute failure actions
  900. for _, action := range rule.Actions {
  901. if action.Options.IsFailureAction {
  902. startTime := time.Now()
  903. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  904. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  905. action.Name, rule.Name, time.Since(startTime), err)
  906. if action.Options.StopOnFailure {
  907. break
  908. }
  909. } else {
  910. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  911. action.Name, rule.Name, time.Since(startTime))
  912. }
  913. }
  914. }
  915. }
  916. }
  917. type eventCronJob struct {
  918. ruleName string
  919. }
  920. func (j *eventCronJob) getTask(rule dataprovider.EventRule) (dataprovider.Task, error) {
  921. if rule.GuardFromConcurrentExecution() {
  922. task, err := dataprovider.GetTaskByName(rule.Name)
  923. if _, ok := err.(*util.RecordNotFoundError); ok {
  924. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  925. task = dataprovider.Task{
  926. Name: rule.Name,
  927. UpdateAt: 0,
  928. Version: 0,
  929. }
  930. err = dataprovider.AddTask(rule.Name)
  931. if err != nil {
  932. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  933. return task, err
  934. }
  935. } else {
  936. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  937. }
  938. return task, err
  939. }
  940. return dataprovider.Task{}, nil
  941. }
  942. func (j *eventCronJob) Run() {
  943. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  944. rule, err := dataprovider.EventRuleExists(j.ruleName)
  945. if err != nil {
  946. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  947. return
  948. }
  949. if err = rule.CheckActionsConsistency(""); err != nil {
  950. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  951. return
  952. }
  953. task, err := j.getTask(rule)
  954. if err != nil {
  955. return
  956. }
  957. if task.Name != "" {
  958. updateInterval := 5 * time.Minute
  959. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  960. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  961. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  962. return
  963. }
  964. err = dataprovider.UpdateTask(rule.Name, task.Version)
  965. if err != nil {
  966. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  967. rule.Name, err)
  968. return
  969. }
  970. ticker := time.NewTicker(updateInterval)
  971. done := make(chan bool)
  972. defer func() {
  973. done <- true
  974. ticker.Stop()
  975. }()
  976. go func(taskName string) {
  977. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  978. for {
  979. select {
  980. case <-done:
  981. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  982. return
  983. case <-ticker.C:
  984. err := dataprovider.UpdateTaskTimestamp(taskName)
  985. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  986. }
  987. }
  988. }(task.Name)
  989. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{})
  990. } else {
  991. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{})
  992. }
  993. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  994. }
  995. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  996. logger.Log(level, "eventmanager", "", format, v...)
  997. }