eventmanager.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io"
  20. "net/http"
  21. "net/url"
  22. "os"
  23. "os/exec"
  24. "path"
  25. "strings"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "github.com/robfig/cron/v3"
  30. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  31. "github.com/drakkan/sftpgo/v2/internal/logger"
  32. "github.com/drakkan/sftpgo/v2/internal/plugin"
  33. "github.com/drakkan/sftpgo/v2/internal/smtp"
  34. "github.com/drakkan/sftpgo/v2/internal/util"
  35. "github.com/drakkan/sftpgo/v2/internal/vfs"
  36. )
  37. var (
  38. // eventManager handle the supported event rules actions
  39. eventManager eventRulesContainer
  40. )
  41. func init() {
  42. eventManager = eventRulesContainer{
  43. schedulesMapping: make(map[string][]cron.EntryID),
  44. }
  45. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  46. func(operation, executor, ip, objectType, objectName string, object plugin.Renderer) {
  47. eventManager.handleProviderEvent(EventParams{
  48. Name: executor,
  49. ObjectName: objectName,
  50. Event: operation,
  51. Status: 1,
  52. ObjectType: objectType,
  53. IP: ip,
  54. Timestamp: time.Now().UnixNano(),
  55. Object: object,
  56. })
  57. })
  58. }
  59. // eventRulesContainer stores event rules by trigger
  60. type eventRulesContainer struct {
  61. sync.RWMutex
  62. FsEvents []dataprovider.EventRule
  63. ProviderEvents []dataprovider.EventRule
  64. Schedules []dataprovider.EventRule
  65. schedulesMapping map[string][]cron.EntryID
  66. lastLoad int64
  67. }
  68. func (r *eventRulesContainer) getLastLoadTime() int64 {
  69. return atomic.LoadInt64(&r.lastLoad)
  70. }
  71. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  72. atomic.StoreInt64(&r.lastLoad, modTime)
  73. }
  74. // RemoveRule deletes the rule with the specified name
  75. func (r *eventRulesContainer) RemoveRule(name string) {
  76. r.Lock()
  77. defer r.Unlock()
  78. r.removeRuleInternal(name)
  79. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  80. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  81. }
  82. func (r *eventRulesContainer) removeRuleInternal(name string) {
  83. for idx := range r.FsEvents {
  84. if r.FsEvents[idx].Name == name {
  85. lastIdx := len(r.FsEvents) - 1
  86. r.FsEvents[idx] = r.FsEvents[lastIdx]
  87. r.FsEvents = r.FsEvents[:lastIdx]
  88. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  89. return
  90. }
  91. }
  92. for idx := range r.ProviderEvents {
  93. if r.ProviderEvents[idx].Name == name {
  94. lastIdx := len(r.ProviderEvents) - 1
  95. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  96. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  97. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  98. return
  99. }
  100. }
  101. for idx := range r.Schedules {
  102. if r.Schedules[idx].Name == name {
  103. if schedules, ok := r.schedulesMapping[name]; ok {
  104. for _, entryID := range schedules {
  105. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  106. eventScheduler.Remove(entryID)
  107. }
  108. delete(r.schedulesMapping, name)
  109. }
  110. lastIdx := len(r.Schedules) - 1
  111. r.Schedules[idx] = r.Schedules[lastIdx]
  112. r.Schedules = r.Schedules[:lastIdx]
  113. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  114. return
  115. }
  116. }
  117. }
  118. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  119. r.removeRuleInternal(rule.Name)
  120. if rule.DeletedAt > 0 {
  121. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  122. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  123. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  124. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  125. }
  126. return
  127. }
  128. switch rule.Trigger {
  129. case dataprovider.EventTriggerFsEvent:
  130. r.FsEvents = append(r.FsEvents, rule)
  131. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  132. case dataprovider.EventTriggerProviderEvent:
  133. r.ProviderEvents = append(r.ProviderEvents, rule)
  134. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  135. case dataprovider.EventTriggerSchedule:
  136. for _, schedule := range rule.Conditions.Schedules {
  137. cronSpec := schedule.GetCronSpec()
  138. job := &eventCronJob{
  139. ruleName: dataprovider.ConvertName(rule.Name),
  140. }
  141. entryID, err := eventScheduler.AddJob(cronSpec, job)
  142. if err != nil {
  143. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  144. return
  145. }
  146. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  147. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  148. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  149. }
  150. r.Schedules = append(r.Schedules, rule)
  151. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  152. default:
  153. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  154. }
  155. }
  156. func (r *eventRulesContainer) loadRules() {
  157. eventManagerLog(logger.LevelDebug, "loading updated rules")
  158. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  159. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  160. if err != nil {
  161. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  162. return
  163. }
  164. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  165. if len(rules) > 0 {
  166. r.Lock()
  167. defer r.Unlock()
  168. for _, rule := range rules {
  169. r.addUpdateRuleInternal(rule)
  170. }
  171. }
  172. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d",
  173. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  174. r.setLastLoadTime(modTime)
  175. }
  176. func (r *eventRulesContainer) checkProviderEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  177. if !util.Contains(conditions.ProviderEvents, params.Event) {
  178. return false
  179. }
  180. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  181. return false
  182. }
  183. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  184. return false
  185. }
  186. return true
  187. }
  188. func (r *eventRulesContainer) checkFsEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  189. if !util.Contains(conditions.FsEvents, params.Event) {
  190. return false
  191. }
  192. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  193. return false
  194. }
  195. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  196. if !checkEventConditionPatterns(params.ObjectName, conditions.Options.FsPaths) {
  197. return false
  198. }
  199. }
  200. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  201. return false
  202. }
  203. if params.Event == operationUpload || params.Event == operationDownload {
  204. if conditions.Options.MinFileSize > 0 {
  205. if params.FileSize < conditions.Options.MinFileSize {
  206. return false
  207. }
  208. }
  209. if conditions.Options.MaxFileSize > 0 {
  210. if params.FileSize > conditions.Options.MaxFileSize {
  211. return false
  212. }
  213. }
  214. }
  215. return true
  216. }
  217. // hasFsRules returns true if there are any rules for filesystem event triggers
  218. func (r *eventRulesContainer) hasFsRules() bool {
  219. r.RLock()
  220. defer r.RUnlock()
  221. return len(r.FsEvents) > 0
  222. }
  223. // handleFsEvent executes the rules actions defined for the specified event
  224. func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
  225. r.RLock()
  226. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  227. for _, rule := range r.FsEvents {
  228. if r.checkFsEventMatch(rule.Conditions, params) {
  229. hasSyncActions := false
  230. for _, action := range rule.Actions {
  231. if action.Options.ExecuteSync {
  232. hasSyncActions = true
  233. break
  234. }
  235. }
  236. if hasSyncActions {
  237. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  238. } else {
  239. rulesAsync = append(rulesAsync, rule)
  240. }
  241. }
  242. }
  243. r.RUnlock()
  244. if len(rulesAsync) > 0 {
  245. go executeAsyncRulesActions(rulesAsync, params)
  246. }
  247. if len(rulesWithSyncActions) > 0 {
  248. return executeSyncRulesActions(rulesWithSyncActions, params)
  249. }
  250. return nil
  251. }
  252. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  253. r.RLock()
  254. defer r.RUnlock()
  255. var rules []dataprovider.EventRule
  256. for _, rule := range r.ProviderEvents {
  257. if r.checkProviderEventMatch(rule.Conditions, params) {
  258. rules = append(rules, rule)
  259. }
  260. }
  261. if len(rules) > 0 {
  262. go executeAsyncRulesActions(rules, params)
  263. }
  264. }
  265. // EventParams defines the supported event parameters
  266. type EventParams struct {
  267. Name string
  268. Event string
  269. Status int
  270. VirtualPath string
  271. FsPath string
  272. VirtualTargetPath string
  273. FsTargetPath string
  274. ObjectName string
  275. ObjectType string
  276. FileSize int64
  277. Protocol string
  278. IP string
  279. Timestamp int64
  280. Object plugin.Renderer
  281. }
  282. func (p *EventParams) getStringReplacements(addObjectData bool) []string {
  283. replacements := []string{
  284. "{{Name}}", p.Name,
  285. "{{Event}}", p.Event,
  286. "{{Status}}", fmt.Sprintf("%d", p.Status),
  287. "{{VirtualPath}}", p.VirtualPath,
  288. "{{FsPath}}", p.FsPath,
  289. "{{VirtualTargetPath}}", p.VirtualTargetPath,
  290. "{{FsTargetPath}}", p.FsTargetPath,
  291. "{{ObjectName}}", p.ObjectName,
  292. "{{ObjectType}}", p.ObjectType,
  293. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  294. "{{Protocol}}", p.Protocol,
  295. "{{IP}}", p.IP,
  296. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  297. }
  298. if addObjectData {
  299. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  300. if err == nil {
  301. replacements = append(replacements, "{{ObjectData}}", string(data))
  302. }
  303. }
  304. return replacements
  305. }
  306. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  307. if !strings.Contains(input, "{{") {
  308. return input
  309. }
  310. return replacer.Replace(input)
  311. }
  312. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  313. matched, err := path.Match(p.Pattern, name)
  314. if err != nil {
  315. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  316. return false
  317. }
  318. if p.InverseMatch {
  319. return !matched
  320. }
  321. return matched
  322. }
  323. // checkConditionPatterns returns false if patterns are defined and no match is found
  324. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  325. if len(patterns) == 0 {
  326. return true
  327. }
  328. for _, p := range patterns {
  329. if checkEventConditionPattern(p, name) {
  330. return true
  331. }
  332. }
  333. return false
  334. }
  335. func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  336. if len(c.QueryParameters) > 0 {
  337. u, err := url.Parse(c.Endpoint)
  338. if err != nil {
  339. return "", fmt.Errorf("invalid endpoint: %w", err)
  340. }
  341. q := u.Query()
  342. for _, keyVal := range c.QueryParameters {
  343. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  344. }
  345. u.RawQuery = q.Encode()
  346. return u.String(), nil
  347. }
  348. return c.Endpoint, nil
  349. }
  350. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params EventParams) error {
  351. if !c.Password.IsEmpty() {
  352. if err := c.Password.TryDecrypt(); err != nil {
  353. return fmt.Errorf("unable to decrypt password: %w", err)
  354. }
  355. }
  356. addObjectData := false
  357. if params.Object != nil {
  358. if !addObjectData {
  359. if strings.Contains(c.Body, "{{ObjectData}}") {
  360. addObjectData = true
  361. }
  362. }
  363. }
  364. replacements := params.getStringReplacements(addObjectData)
  365. replacer := strings.NewReplacer(replacements...)
  366. endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
  367. if err != nil {
  368. return err
  369. }
  370. var body io.Reader
  371. if c.Body != "" && c.Method != http.MethodGet {
  372. body = bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))
  373. }
  374. req, err := http.NewRequest(c.Method, endpoint, body)
  375. if err != nil {
  376. return err
  377. }
  378. if c.Username != "" {
  379. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetAdditionalData())
  380. }
  381. for _, keyVal := range c.Headers {
  382. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  383. }
  384. client := c.GetHTTPClient()
  385. defer client.CloseIdleConnections()
  386. startTime := time.Now()
  387. resp, err := client.Do(req)
  388. if err != nil {
  389. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  390. endpoint, time.Since(startTime), err)
  391. return err
  392. }
  393. defer resp.Body.Close()
  394. eventManagerLog(logger.LevelDebug, "http notification sent, endopoint: %s, elapsed: %s, status code: %d",
  395. endpoint, time.Since(startTime), resp.StatusCode)
  396. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  397. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  398. }
  399. return nil
  400. }
  401. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params EventParams) error {
  402. envVars := make([]string, 0, len(c.EnvVars))
  403. addObjectData := false
  404. if params.Object != nil {
  405. for _, k := range c.EnvVars {
  406. if strings.Contains(k.Value, "{{ObjectData}}") {
  407. addObjectData = true
  408. break
  409. }
  410. }
  411. }
  412. replacements := params.getStringReplacements(addObjectData)
  413. replacer := strings.NewReplacer(replacements...)
  414. for _, keyVal := range c.EnvVars {
  415. envVars = append(envVars, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  416. }
  417. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  418. defer cancel()
  419. cmd := exec.CommandContext(ctx, c.Cmd)
  420. cmd.Env = append(cmd.Env, os.Environ()...)
  421. cmd.Env = append(cmd.Env, envVars...)
  422. startTime := time.Now()
  423. err := cmd.Run()
  424. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  425. c.Cmd, time.Since(startTime), err)
  426. return err
  427. }
  428. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params EventParams) error {
  429. addObjectData := false
  430. if params.Object != nil {
  431. if strings.Contains(c.Body, "{{ObjectData}}") {
  432. addObjectData = true
  433. }
  434. }
  435. replacements := params.getStringReplacements(addObjectData)
  436. replacer := strings.NewReplacer(replacements...)
  437. body := replaceWithReplacer(c.Body, replacer)
  438. subject := replaceWithReplacer(c.Subject, replacer)
  439. startTime := time.Now()
  440. err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain)
  441. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  442. time.Since(startTime), err)
  443. return err
  444. }
  445. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions) error {
  446. users, err := dataprovider.DumpUsers()
  447. if err != nil {
  448. return fmt.Errorf("unable to get users: %w", err)
  449. }
  450. var failedResets []string
  451. for _, user := range users {
  452. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  453. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, name conditions don't match",
  454. user.Username)
  455. continue
  456. }
  457. if !QuotaScans.AddUserQuotaScan(user.Username) {
  458. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %s", user.Username)
  459. failedResets = append(failedResets, user.Username)
  460. continue
  461. }
  462. numFiles, size, err := user.ScanQuota()
  463. QuotaScans.RemoveUserQuotaScan(user.Username)
  464. if err != nil {
  465. eventManagerLog(logger.LevelError, "error scanning quota for user %s: %v", user.Username, err)
  466. failedResets = append(failedResets, user.Username)
  467. continue
  468. }
  469. err = dataprovider.UpdateUserQuota(&user, numFiles, size, true)
  470. if err != nil {
  471. eventManagerLog(logger.LevelError, "error updating quota for user %s: %v", user.Username, err)
  472. failedResets = append(failedResets, user.Username)
  473. continue
  474. }
  475. }
  476. if len(failedResets) > 0 {
  477. return fmt.Errorf("quota reset failed for users: %+v", failedResets)
  478. }
  479. return nil
  480. }
  481. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions) error {
  482. folders, err := dataprovider.DumpFolders()
  483. if err != nil {
  484. return fmt.Errorf("unable to get folders: %w", err)
  485. }
  486. var failedResets []string
  487. for _, folder := range folders {
  488. if !checkEventConditionPatterns(folder.Name, conditions.Names) {
  489. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  490. folder.Name)
  491. continue
  492. }
  493. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  494. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %s", folder.Name)
  495. failedResets = append(failedResets, folder.Name)
  496. continue
  497. }
  498. f := vfs.VirtualFolder{
  499. BaseVirtualFolder: folder,
  500. VirtualPath: "/",
  501. }
  502. numFiles, size, err := f.ScanQuota()
  503. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  504. if err != nil {
  505. eventManagerLog(logger.LevelError, "error scanning quota for folder %s: %v", folder.Name, err)
  506. failedResets = append(failedResets, folder.Name)
  507. continue
  508. }
  509. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  510. if err != nil {
  511. eventManagerLog(logger.LevelError, "error updating quota for folder %s: %v", folder.Name, err)
  512. failedResets = append(failedResets, folder.Name)
  513. continue
  514. }
  515. }
  516. if len(failedResets) > 0 {
  517. return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
  518. }
  519. return nil
  520. }
  521. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions) error {
  522. users, err := dataprovider.DumpUsers()
  523. if err != nil {
  524. return fmt.Errorf("unable to get users: %w", err)
  525. }
  526. var failedResets []string
  527. for _, user := range users {
  528. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  529. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, name conditions don't match",
  530. user.Username)
  531. continue
  532. }
  533. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  534. if err != nil {
  535. eventManagerLog(logger.LevelError, "error updating transfer quota for user %s: %v", user.Username, err)
  536. failedResets = append(failedResets, user.Username)
  537. continue
  538. }
  539. }
  540. if len(failedResets) > 0 {
  541. return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
  542. }
  543. return nil
  544. }
  545. func executeRuleAction(action dataprovider.BaseEventAction, params EventParams, conditions dataprovider.ConditionOptions) error {
  546. switch action.Type {
  547. case dataprovider.ActionTypeHTTP:
  548. return executeHTTPRuleAction(action.Options.HTTPConfig, params)
  549. case dataprovider.ActionTypeCommand:
  550. return executeCommandRuleAction(action.Options.CmdConfig, params)
  551. case dataprovider.ActionTypeEmail:
  552. return executeEmailRuleAction(action.Options.EmailConfig, params)
  553. case dataprovider.ActionTypeBackup:
  554. return dataprovider.ExecuteBackup()
  555. case dataprovider.ActionTypeUserQuotaReset:
  556. return executeUsersQuotaResetRuleAction(conditions)
  557. case dataprovider.ActionTypeFolderQuotaReset:
  558. return executeFoldersQuotaResetRuleAction(conditions)
  559. case dataprovider.ActionTypeTransferQuotaReset:
  560. return executeTransferQuotaResetRuleAction(conditions)
  561. default:
  562. return fmt.Errorf("unsupported action type: %d", action.Type)
  563. }
  564. }
  565. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  566. var errRes error
  567. for _, rule := range rules {
  568. var failedActions []string
  569. for _, action := range rule.Actions {
  570. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  571. startTime := time.Now()
  572. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  573. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  574. action.Name, rule.Name, time.Since(startTime), err)
  575. failedActions = append(failedActions, action.Name)
  576. // we return the last error, it is ok for now
  577. errRes = err
  578. if action.Options.StopOnFailure {
  579. break
  580. }
  581. } else {
  582. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  583. action.Name, rule.Name, time.Since(startTime))
  584. }
  585. }
  586. }
  587. // execute async actions if any, including failure actions
  588. go executeRuleAsyncActions(rule, params, failedActions)
  589. }
  590. return errRes
  591. }
  592. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  593. for _, rule := range rules {
  594. executeRuleAsyncActions(rule, params, nil)
  595. }
  596. }
  597. func executeRuleAsyncActions(rule dataprovider.EventRule, params EventParams, failedActions []string) {
  598. for _, action := range rule.Actions {
  599. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  600. startTime := time.Now()
  601. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  602. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  603. action.Name, rule.Name, time.Since(startTime), err)
  604. failedActions = append(failedActions, action.Name)
  605. if action.Options.StopOnFailure {
  606. break
  607. }
  608. } else {
  609. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  610. action.Name, rule.Name, time.Since(startTime))
  611. }
  612. }
  613. }
  614. if len(failedActions) > 0 {
  615. // execute failure actions
  616. for _, action := range rule.Actions {
  617. if action.Options.IsFailureAction {
  618. startTime := time.Now()
  619. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  620. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  621. action.Name, rule.Name, time.Since(startTime), err)
  622. if action.Options.StopOnFailure {
  623. break
  624. }
  625. } else {
  626. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  627. action.Name, rule.Name, time.Since(startTime))
  628. }
  629. }
  630. }
  631. }
  632. }
  633. type eventCronJob struct {
  634. ruleName string
  635. }
  636. func (j *eventCronJob) getTask(rule dataprovider.EventRule) (dataprovider.Task, error) {
  637. if rule.GuardFromConcurrentExecution() {
  638. task, err := dataprovider.GetTaskByName(rule.Name)
  639. if _, ok := err.(*util.RecordNotFoundError); ok {
  640. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  641. task = dataprovider.Task{
  642. Name: rule.Name,
  643. UpdateAt: 0,
  644. Version: 0,
  645. }
  646. err = dataprovider.AddTask(rule.Name)
  647. if err != nil {
  648. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  649. return task, err
  650. }
  651. } else {
  652. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  653. }
  654. return task, err
  655. }
  656. return dataprovider.Task{}, nil
  657. }
  658. func (j *eventCronJob) Run() {
  659. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  660. rule, err := dataprovider.EventRuleExists(j.ruleName)
  661. if err != nil {
  662. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  663. return
  664. }
  665. task, err := j.getTask(rule)
  666. if err != nil {
  667. return
  668. }
  669. if task.Name != "" {
  670. updateInterval := 5 * time.Minute
  671. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  672. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  673. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  674. return
  675. }
  676. err = dataprovider.UpdateTask(rule.Name, task.Version)
  677. if err != nil {
  678. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  679. rule.Name, err)
  680. return
  681. }
  682. ticker := time.NewTicker(updateInterval)
  683. done := make(chan bool)
  684. defer func() {
  685. done <- true
  686. ticker.Stop()
  687. }()
  688. go func(taskName string) {
  689. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  690. for {
  691. select {
  692. case <-done:
  693. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  694. return
  695. case <-ticker.C:
  696. err := dataprovider.UpdateTaskTimestamp(taskName)
  697. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  698. }
  699. }
  700. }(task.Name)
  701. executeRuleAsyncActions(rule, EventParams{}, nil)
  702. } else {
  703. executeRuleAsyncActions(rule, EventParams{}, nil)
  704. }
  705. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  706. }
  707. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  708. logger.Log(level, "eventmanager", "", format, v...)
  709. }