1
0

eventmanager.go 88 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "mime/multipart"
  25. "net/http"
  26. "net/textproto"
  27. "net/url"
  28. "os"
  29. "os/exec"
  30. "path"
  31. "path/filepath"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "time"
  37. "github.com/bmatcuk/doublestar/v4"
  38. "github.com/klauspost/compress/zip"
  39. "github.com/robfig/cron/v3"
  40. "github.com/rs/xid"
  41. "github.com/sftpgo/sdk"
  42. "github.com/wneessen/go-mail"
  43. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  44. "github.com/drakkan/sftpgo/v2/internal/logger"
  45. "github.com/drakkan/sftpgo/v2/internal/plugin"
  46. "github.com/drakkan/sftpgo/v2/internal/smtp"
  47. "github.com/drakkan/sftpgo/v2/internal/util"
  48. "github.com/drakkan/sftpgo/v2/internal/vfs"
  49. )
  50. const (
  51. ipBlockedEventName = "IP Blocked"
  52. maxAttachmentsSize = int64(10 * 1024 * 1024)
  53. )
  54. // Supported IDP login events
  55. const (
  56. IDPLoginUser = "IDP login user"
  57. IDPLoginAdmin = "IDP login admin"
  58. )
  59. var (
  60. // eventManager handle the supported event rules actions
  61. eventManager eventRulesContainer
  62. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  63. )
  64. func init() {
  65. eventManager = eventRulesContainer{
  66. schedulesMapping: make(map[string][]cron.EntryID),
  67. // arbitrary maximum number of concurrent asynchronous tasks,
  68. // each task could execute multiple actions
  69. concurrencyGuard: make(chan struct{}, 200),
  70. }
  71. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  72. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  73. eventManager.handleProviderEvent(EventParams{
  74. Name: executor,
  75. ObjectName: objectName,
  76. Event: operation,
  77. Status: 1,
  78. ObjectType: objectType,
  79. IP: ip,
  80. Role: role,
  81. Timestamp: time.Now().UnixNano(),
  82. Object: object,
  83. })
  84. })
  85. }
  86. // HandleCertificateEvent checks and executes action rules for certificate events
  87. func HandleCertificateEvent(params EventParams) {
  88. eventManager.handleCertificateEvent(params)
  89. }
  90. // HandleIDPLoginEvent executes actions defined for a successful login from an Identity Provider
  91. func HandleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User, *dataprovider.Admin, error) {
  92. return eventManager.handleIDPLoginEvent(params, customFields)
  93. }
  94. // eventRulesContainer stores event rules by trigger
  95. type eventRulesContainer struct {
  96. sync.RWMutex
  97. lastLoad atomic.Int64
  98. FsEvents []dataprovider.EventRule
  99. ProviderEvents []dataprovider.EventRule
  100. Schedules []dataprovider.EventRule
  101. IPBlockedEvents []dataprovider.EventRule
  102. CertificateEvents []dataprovider.EventRule
  103. IPDLoginEvents []dataprovider.EventRule
  104. schedulesMapping map[string][]cron.EntryID
  105. concurrencyGuard chan struct{}
  106. }
  107. func (r *eventRulesContainer) addAsyncTask() {
  108. activeHooks.Add(1)
  109. r.concurrencyGuard <- struct{}{}
  110. }
  111. func (r *eventRulesContainer) removeAsyncTask() {
  112. activeHooks.Add(-1)
  113. <-r.concurrencyGuard
  114. }
  115. func (r *eventRulesContainer) getLastLoadTime() int64 {
  116. return r.lastLoad.Load()
  117. }
  118. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  119. r.lastLoad.Store(modTime)
  120. }
  121. // RemoveRule deletes the rule with the specified name
  122. func (r *eventRulesContainer) RemoveRule(name string) {
  123. r.Lock()
  124. defer r.Unlock()
  125. r.removeRuleInternal(name)
  126. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  127. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  128. }
  129. func (r *eventRulesContainer) removeRuleInternal(name string) {
  130. for idx := range r.FsEvents {
  131. if r.FsEvents[idx].Name == name {
  132. lastIdx := len(r.FsEvents) - 1
  133. r.FsEvents[idx] = r.FsEvents[lastIdx]
  134. r.FsEvents = r.FsEvents[:lastIdx]
  135. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  136. return
  137. }
  138. }
  139. for idx := range r.ProviderEvents {
  140. if r.ProviderEvents[idx].Name == name {
  141. lastIdx := len(r.ProviderEvents) - 1
  142. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  143. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  144. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  145. return
  146. }
  147. }
  148. for idx := range r.IPBlockedEvents {
  149. if r.IPBlockedEvents[idx].Name == name {
  150. lastIdx := len(r.IPBlockedEvents) - 1
  151. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  152. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  153. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  154. return
  155. }
  156. }
  157. for idx := range r.CertificateEvents {
  158. if r.CertificateEvents[idx].Name == name {
  159. lastIdx := len(r.CertificateEvents) - 1
  160. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  161. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  162. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  163. return
  164. }
  165. }
  166. for idx := range r.IPDLoginEvents {
  167. if r.IPDLoginEvents[idx].Name == name {
  168. lastIdx := len(r.IPDLoginEvents) - 1
  169. r.IPDLoginEvents[idx] = r.IPDLoginEvents[lastIdx]
  170. r.IPDLoginEvents = r.IPDLoginEvents[:lastIdx]
  171. eventManagerLog(logger.LevelDebug, "removed rule %q from IDP login events", name)
  172. return
  173. }
  174. }
  175. for idx := range r.Schedules {
  176. if r.Schedules[idx].Name == name {
  177. if schedules, ok := r.schedulesMapping[name]; ok {
  178. for _, entryID := range schedules {
  179. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  180. eventScheduler.Remove(entryID)
  181. }
  182. delete(r.schedulesMapping, name)
  183. }
  184. lastIdx := len(r.Schedules) - 1
  185. r.Schedules[idx] = r.Schedules[lastIdx]
  186. r.Schedules = r.Schedules[:lastIdx]
  187. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  188. return
  189. }
  190. }
  191. }
  192. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  193. r.removeRuleInternal(rule.Name)
  194. if rule.DeletedAt > 0 {
  195. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  196. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  197. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  198. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  199. }
  200. return
  201. }
  202. if rule.Status != 1 || rule.Trigger == dataprovider.EventTriggerOnDemand {
  203. return
  204. }
  205. switch rule.Trigger {
  206. case dataprovider.EventTriggerFsEvent:
  207. r.FsEvents = append(r.FsEvents, rule)
  208. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  209. case dataprovider.EventTriggerProviderEvent:
  210. r.ProviderEvents = append(r.ProviderEvents, rule)
  211. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  212. case dataprovider.EventTriggerIPBlocked:
  213. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  214. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  215. case dataprovider.EventTriggerCertificate:
  216. r.CertificateEvents = append(r.CertificateEvents, rule)
  217. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  218. case dataprovider.EventTriggerIDPLogin:
  219. r.IPDLoginEvents = append(r.IPDLoginEvents, rule)
  220. eventManagerLog(logger.LevelDebug, "added rule %q to IDP login events", rule.Name)
  221. case dataprovider.EventTriggerSchedule:
  222. for _, schedule := range rule.Conditions.Schedules {
  223. cronSpec := schedule.GetCronSpec()
  224. job := &eventCronJob{
  225. ruleName: dataprovider.ConvertName(rule.Name),
  226. }
  227. entryID, err := eventScheduler.AddJob(cronSpec, job)
  228. if err != nil {
  229. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  230. return
  231. }
  232. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  233. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  234. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  235. }
  236. r.Schedules = append(r.Schedules, rule)
  237. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  238. default:
  239. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  240. }
  241. }
  242. func (r *eventRulesContainer) loadRules() {
  243. eventManagerLog(logger.LevelDebug, "loading updated rules")
  244. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  245. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  246. if err != nil {
  247. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  248. return
  249. }
  250. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  251. if len(rules) > 0 {
  252. r.Lock()
  253. defer r.Unlock()
  254. for _, rule := range rules {
  255. r.addUpdateRuleInternal(rule)
  256. }
  257. }
  258. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d, IDP login events: %d",
  259. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents), len(r.IPDLoginEvents))
  260. r.setLastLoadTime(modTime)
  261. }
  262. func (*eventRulesContainer) checkIPDLoginEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  263. switch conditions.IDPLoginEvent {
  264. case dataprovider.IDPLoginUser:
  265. if params.Event != IDPLoginUser {
  266. return false
  267. }
  268. case dataprovider.IDPLoginAdmin:
  269. if params.Event != IDPLoginAdmin {
  270. return false
  271. }
  272. }
  273. return checkEventConditionPatterns(params.Name, conditions.Options.Names)
  274. }
  275. func (*eventRulesContainer) checkProviderEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  276. if !util.Contains(conditions.ProviderEvents, params.Event) {
  277. return false
  278. }
  279. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  280. return false
  281. }
  282. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  283. return false
  284. }
  285. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  286. return false
  287. }
  288. return true
  289. }
  290. func (*eventRulesContainer) checkFsEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  291. if !util.Contains(conditions.FsEvents, params.Event) {
  292. return false
  293. }
  294. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  295. return false
  296. }
  297. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  298. return false
  299. }
  300. if !checkEventGroupConditionPatters(params.Groups, conditions.Options.GroupNames) {
  301. return false
  302. }
  303. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  304. return false
  305. }
  306. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  307. return false
  308. }
  309. if params.Event == operationUpload || params.Event == operationDownload {
  310. if conditions.Options.MinFileSize > 0 {
  311. if params.FileSize < conditions.Options.MinFileSize {
  312. return false
  313. }
  314. }
  315. if conditions.Options.MaxFileSize > 0 {
  316. if params.FileSize > conditions.Options.MaxFileSize {
  317. return false
  318. }
  319. }
  320. }
  321. return true
  322. }
  323. // hasFsRules returns true if there are any rules for filesystem event triggers
  324. func (r *eventRulesContainer) hasFsRules() bool {
  325. r.RLock()
  326. defer r.RUnlock()
  327. return len(r.FsEvents) > 0
  328. }
  329. // handleFsEvent executes the rules actions defined for the specified event.
  330. // The boolean parameter indicates whether a sync action was executed
  331. func (r *eventRulesContainer) handleFsEvent(params EventParams) (bool, error) {
  332. if params.Protocol == protocolEventAction {
  333. return false, nil
  334. }
  335. r.RLock()
  336. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  337. for _, rule := range r.FsEvents {
  338. if r.checkFsEventMatch(&rule.Conditions, &params) {
  339. if err := rule.CheckActionsConsistency(""); err != nil {
  340. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  341. rule.Name, err, params.Event)
  342. continue
  343. }
  344. hasSyncActions := false
  345. for _, action := range rule.Actions {
  346. if action.Options.ExecuteSync {
  347. hasSyncActions = true
  348. break
  349. }
  350. }
  351. if hasSyncActions {
  352. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  353. } else {
  354. rulesAsync = append(rulesAsync, rule)
  355. }
  356. }
  357. }
  358. r.RUnlock()
  359. params.sender = params.Name
  360. if len(rulesAsync) > 0 {
  361. go executeAsyncRulesActions(rulesAsync, params)
  362. }
  363. if len(rulesWithSyncActions) > 0 {
  364. return true, executeSyncRulesActions(rulesWithSyncActions, params)
  365. }
  366. return false, nil
  367. }
  368. func (r *eventRulesContainer) handleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User,
  369. *dataprovider.Admin, error,
  370. ) {
  371. r.RLock()
  372. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  373. for _, rule := range r.IPDLoginEvents {
  374. if r.checkIPDLoginEventMatch(&rule.Conditions, &params) {
  375. if err := rule.CheckActionsConsistency(""); err != nil {
  376. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  377. rule.Name, err, params.Event)
  378. continue
  379. }
  380. hasSyncActions := false
  381. for _, action := range rule.Actions {
  382. if action.Options.ExecuteSync {
  383. hasSyncActions = true
  384. break
  385. }
  386. }
  387. if hasSyncActions {
  388. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  389. } else {
  390. rulesAsync = append(rulesAsync, rule)
  391. }
  392. }
  393. }
  394. r.RUnlock()
  395. if len(rulesAsync) == 0 && len(rulesWithSyncActions) == 0 {
  396. return nil, nil, nil
  397. }
  398. params.addIDPCustomFields(customFields)
  399. if len(rulesWithSyncActions) > 1 {
  400. var ruleNames []string
  401. for _, r := range rulesWithSyncActions {
  402. ruleNames = append(ruleNames, r.Name)
  403. }
  404. return nil, nil, fmt.Errorf("more than one account check action rules matches: %q", strings.Join(ruleNames, ","))
  405. }
  406. if len(rulesAsync) > 0 {
  407. go executeAsyncRulesActions(rulesAsync, params)
  408. }
  409. if len(rulesWithSyncActions) > 0 {
  410. return executeIDPAccountCheckRule(rulesWithSyncActions[0], params)
  411. }
  412. return nil, nil, nil
  413. }
  414. // username is populated for user objects
  415. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  416. r.RLock()
  417. defer r.RUnlock()
  418. var rules []dataprovider.EventRule
  419. for _, rule := range r.ProviderEvents {
  420. if r.checkProviderEventMatch(&rule.Conditions, &params) {
  421. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  422. rules = append(rules, rule)
  423. } else {
  424. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  425. rule.Name, err, params.Event, params.ObjectType)
  426. }
  427. }
  428. }
  429. if len(rules) > 0 {
  430. params.sender = params.ObjectName
  431. go executeAsyncRulesActions(rules, params)
  432. }
  433. }
  434. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  435. r.RLock()
  436. defer r.RUnlock()
  437. if len(r.IPBlockedEvents) == 0 {
  438. return
  439. }
  440. var rules []dataprovider.EventRule
  441. for _, rule := range r.IPBlockedEvents {
  442. if err := rule.CheckActionsConsistency(""); err == nil {
  443. rules = append(rules, rule)
  444. } else {
  445. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  446. rule.Name, err, params.Event)
  447. }
  448. }
  449. if len(rules) > 0 {
  450. go executeAsyncRulesActions(rules, params)
  451. }
  452. }
  453. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  454. r.RLock()
  455. defer r.RUnlock()
  456. if len(r.CertificateEvents) == 0 {
  457. return
  458. }
  459. var rules []dataprovider.EventRule
  460. for _, rule := range r.CertificateEvents {
  461. if err := rule.CheckActionsConsistency(""); err == nil {
  462. rules = append(rules, rule)
  463. } else {
  464. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  465. rule.Name, err, params.Event)
  466. }
  467. }
  468. if len(rules) > 0 {
  469. go executeAsyncRulesActions(rules, params)
  470. }
  471. }
  472. type executedRetentionCheck struct {
  473. Username string
  474. ActionName string
  475. Results []folderRetentionCheckResult
  476. }
  477. // EventParams defines the supported event parameters
  478. type EventParams struct {
  479. Name string
  480. Groups []sdk.GroupMapping
  481. Event string
  482. Status int
  483. VirtualPath string
  484. FsPath string
  485. VirtualTargetPath string
  486. FsTargetPath string
  487. ObjectName string
  488. ObjectType string
  489. FileSize int64
  490. Elapsed int64
  491. Protocol string
  492. IP string
  493. Role string
  494. Timestamp int64
  495. IDPCustomFields *map[string]string
  496. Object plugin.Renderer
  497. sender string
  498. updateStatusFromError bool
  499. errors []string
  500. retentionChecks []executedRetentionCheck
  501. }
  502. func (p *EventParams) getACopy() *EventParams {
  503. params := *p
  504. params.errors = make([]string, len(p.errors))
  505. copy(params.errors, p.errors)
  506. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  507. for _, c := range p.retentionChecks {
  508. executedCheck := executedRetentionCheck{
  509. Username: c.Username,
  510. ActionName: c.ActionName,
  511. }
  512. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  513. copy(executedCheck.Results, c.Results)
  514. retentionChecks = append(retentionChecks, executedCheck)
  515. }
  516. params.retentionChecks = retentionChecks
  517. if p.IDPCustomFields != nil {
  518. fields := make(map[string]string)
  519. for k, v := range *p.IDPCustomFields {
  520. fields[k] = v
  521. }
  522. params.IDPCustomFields = &fields
  523. }
  524. return &params
  525. }
  526. func (p *EventParams) addIDPCustomFields(customFields *map[string]any) {
  527. if customFields == nil {
  528. return
  529. }
  530. fields := make(map[string]string)
  531. for k, v := range *customFields {
  532. switch val := v.(type) {
  533. case string:
  534. fields[k] = val
  535. }
  536. }
  537. p.IDPCustomFields = &fields
  538. }
  539. // AddError adds a new error to the event params and update the status if needed
  540. func (p *EventParams) AddError(err error) {
  541. if err == nil {
  542. return
  543. }
  544. if p.updateStatusFromError && p.Status == 1 {
  545. p.Status = 2
  546. }
  547. p.errors = append(p.errors, err.Error())
  548. }
  549. func (p *EventParams) setBackupParams(backupPath string) {
  550. if p.sender != "" {
  551. return
  552. }
  553. p.sender = dataprovider.ActionExecutorSystem
  554. p.FsPath = backupPath
  555. p.ObjectName = filepath.Base(backupPath)
  556. p.VirtualPath = "/" + p.ObjectName
  557. p.Timestamp = time.Now().UnixNano()
  558. info, err := os.Stat(backupPath)
  559. if err == nil {
  560. p.FileSize = info.Size()
  561. }
  562. }
  563. func (p *EventParams) getStatusString() string {
  564. switch p.Status {
  565. case 1:
  566. return "OK"
  567. default:
  568. return "KO"
  569. }
  570. }
  571. // getUsers returns users with group settings not applied
  572. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  573. if p.sender == "" {
  574. users, err := dataprovider.DumpUsers()
  575. if err != nil {
  576. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  577. return users, errors.New("unable to get users")
  578. }
  579. return users, nil
  580. }
  581. user, err := p.getUserFromSender()
  582. if err != nil {
  583. return nil, err
  584. }
  585. return []dataprovider.User{user}, nil
  586. }
  587. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  588. if p.sender == dataprovider.ActionExecutorSystem {
  589. return dataprovider.User{
  590. BaseUser: sdk.BaseUser{
  591. Status: 1,
  592. Username: p.sender,
  593. HomeDir: dataprovider.GetBackupsPath(),
  594. Permissions: map[string][]string{
  595. "/": {dataprovider.PermAny},
  596. },
  597. },
  598. }, nil
  599. }
  600. user, err := dataprovider.UserExists(p.sender, "")
  601. if err != nil {
  602. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  603. return user, fmt.Errorf("error getting user %q", p.sender)
  604. }
  605. return user, nil
  606. }
  607. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  608. if p.sender == "" {
  609. return dataprovider.DumpFolders()
  610. }
  611. folder, err := dataprovider.GetFolderByName(p.sender)
  612. if err != nil {
  613. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  614. }
  615. return []vfs.BaseVirtualFolder{folder}, nil
  616. }
  617. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  618. if len(p.retentionChecks) == 0 {
  619. return nil, errors.New("no data retention report available")
  620. }
  621. var b bytes.Buffer
  622. if _, err := p.writeCompressedDataRetentionReports(&b); err != nil {
  623. return nil, err
  624. }
  625. return b.Bytes(), nil
  626. }
  627. func (p *EventParams) writeCompressedDataRetentionReports(w io.Writer) (int64, error) {
  628. var n int64
  629. wr := zip.NewWriter(w)
  630. for _, check := range p.retentionChecks {
  631. data, err := getCSVRetentionReport(check.Results)
  632. if err != nil {
  633. return n, fmt.Errorf("unable to get CSV report: %w", err)
  634. }
  635. dataSize := int64(len(data))
  636. n += dataSize
  637. // we suppose a 3:1 compression ratio
  638. if n > (maxAttachmentsSize * 3) {
  639. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s",
  640. util.ByteCountIEC(n))
  641. return n, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(n))
  642. }
  643. fh := &zip.FileHeader{
  644. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  645. Method: zip.Deflate,
  646. Modified: time.Now().UTC(),
  647. }
  648. f, err := wr.CreateHeader(fh)
  649. if err != nil {
  650. return n, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  651. }
  652. _, err = io.CopyN(f, bytes.NewBuffer(data), dataSize)
  653. if err != nil {
  654. return n, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  655. }
  656. }
  657. if err := wr.Close(); err != nil {
  658. return n, fmt.Errorf("unable to close zip writer: %w", err)
  659. }
  660. return n, nil
  661. }
  662. func (p *EventParams) getRetentionReportsAsMailAttachment() (*mail.File, error) {
  663. if len(p.retentionChecks) == 0 {
  664. return nil, errors.New("no data retention report available")
  665. }
  666. return &mail.File{
  667. Name: "retention-reports.zip",
  668. Header: make(map[string][]string),
  669. Writer: p.writeCompressedDataRetentionReports,
  670. }, nil
  671. }
  672. func (*EventParams) getStringReplacement(val string, jsonEscaped bool) string {
  673. if jsonEscaped {
  674. return util.JSONEscape(val)
  675. }
  676. return val
  677. }
  678. func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []string {
  679. replacements := []string{
  680. "{{Name}}", p.getStringReplacement(p.Name, jsonEscaped),
  681. "{{Event}}", p.Event,
  682. "{{Status}}", fmt.Sprintf("%d", p.Status),
  683. "{{VirtualPath}}", p.getStringReplacement(p.VirtualPath, jsonEscaped),
  684. "{{FsPath}}", p.getStringReplacement(p.FsPath, jsonEscaped),
  685. "{{VirtualTargetPath}}", p.getStringReplacement(p.VirtualTargetPath, jsonEscaped),
  686. "{{FsTargetPath}}", p.getStringReplacement(p.FsTargetPath, jsonEscaped),
  687. "{{ObjectName}}", p.getStringReplacement(p.ObjectName, jsonEscaped),
  688. "{{ObjectType}}", p.ObjectType,
  689. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  690. "{{Elapsed}}", fmt.Sprintf("%d", p.Elapsed),
  691. "{{Protocol}}", p.Protocol,
  692. "{{IP}}", p.IP,
  693. "{{Role}}", p.getStringReplacement(p.Role, jsonEscaped),
  694. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  695. "{{StatusString}}", p.getStatusString(),
  696. }
  697. if p.VirtualPath != "" {
  698. replacements = append(replacements, "{{VirtualDirPath}}", p.getStringReplacement(path.Dir(p.VirtualPath), jsonEscaped))
  699. }
  700. if p.VirtualTargetPath != "" {
  701. replacements = append(replacements, "{{VirtualTargetDirPath}}", p.getStringReplacement(path.Dir(p.VirtualTargetPath), jsonEscaped))
  702. replacements = append(replacements, "{{TargetName}}", p.getStringReplacement(path.Base(p.VirtualTargetPath), jsonEscaped))
  703. }
  704. if len(p.errors) > 0 {
  705. replacements = append(replacements, "{{ErrorString}}", p.getStringReplacement(strings.Join(p.errors, ", "), jsonEscaped))
  706. } else {
  707. replacements = append(replacements, "{{ErrorString}}", "")
  708. }
  709. replacements = append(replacements, "{{ObjectData}}", "")
  710. if addObjectData {
  711. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  712. if err == nil {
  713. replacements[len(replacements)-1] = string(data)
  714. }
  715. }
  716. if p.IDPCustomFields != nil {
  717. for k, v := range *p.IDPCustomFields {
  718. replacements = append(replacements, fmt.Sprintf("{{IDPField%s}}", k), p.getStringReplacement(v, jsonEscaped))
  719. }
  720. }
  721. return replacements
  722. }
  723. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  724. var b bytes.Buffer
  725. csvWriter := csv.NewWriter(&b)
  726. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  727. "elapsed (ms)", "info", "error"})
  728. if err != nil {
  729. return nil, err
  730. }
  731. for _, result := range results {
  732. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  733. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  734. result.Info, result.Error})
  735. if err != nil {
  736. return nil, err
  737. }
  738. }
  739. csvWriter.Flush()
  740. err = csvWriter.Error()
  741. return b.Bytes(), err
  742. }
  743. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSourcePath, virtualTargetPath string,
  744. numFiles int, truncatedSize int64, errTransfer error, operation string, startTime time.Time,
  745. ) error {
  746. var fsDstPath string
  747. var errDstFs error
  748. errWrite := w.Close()
  749. targetPath := virtualSourcePath
  750. if virtualTargetPath != "" {
  751. targetPath = virtualTargetPath
  752. var fsDst vfs.Fs
  753. fsDst, fsDstPath, errDstFs = conn.GetFsAndResolvedPath(virtualTargetPath)
  754. if errTransfer != nil && errDstFs == nil {
  755. // try to remove a partial file on error. If this fails, we can't do anything
  756. errRemove := fsDst.Remove(fsDstPath, false)
  757. conn.Log(logger.LevelDebug, "removing partial file %q after write error, result: %v", virtualTargetPath, errRemove)
  758. }
  759. }
  760. info, err := conn.doStatInternal(targetPath, 0, false, false)
  761. if err == nil {
  762. updateUserQuotaAfterFileWrite(conn, targetPath, numFiles, info.Size()-truncatedSize)
  763. var fsSrcPath string
  764. var errSrcFs error
  765. if virtualSourcePath != "" {
  766. _, fsSrcPath, errSrcFs = conn.GetFsAndResolvedPath(virtualSourcePath)
  767. }
  768. if errSrcFs == nil && errDstFs == nil {
  769. elapsed := time.Since(startTime).Nanoseconds() / 1000000
  770. if errTransfer == nil {
  771. errTransfer = errWrite
  772. }
  773. if operation == operationCopy {
  774. logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
  775. "", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed)
  776. }
  777. ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed) //nolint:errcheck
  778. }
  779. } else {
  780. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
  781. }
  782. if errTransfer != nil {
  783. return errTransfer
  784. }
  785. return errWrite
  786. }
  787. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  788. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  789. if err != nil {
  790. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  791. return
  792. }
  793. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  794. if vfolder.IsIncludedInUserQuota() {
  795. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  796. }
  797. }
  798. func checkWriterPermsAndQuota(conn *BaseConnection, virtualPath string, numFiles int, expectedSize, truncatedSize int64) error {
  799. if numFiles == 0 {
  800. if !conn.User.HasPerm(dataprovider.PermOverwrite, path.Dir(virtualPath)) {
  801. return conn.GetPermissionDeniedError()
  802. }
  803. } else {
  804. if !conn.User.HasPerm(dataprovider.PermUpload, path.Dir(virtualPath)) {
  805. return conn.GetPermissionDeniedError()
  806. }
  807. }
  808. q, _ := conn.HasSpace(numFiles > 0, false, virtualPath)
  809. if !q.HasSpace {
  810. return conn.GetQuotaExceededError()
  811. }
  812. if expectedSize != -1 {
  813. sizeDiff := expectedSize - truncatedSize
  814. if sizeDiff > 0 {
  815. remainingSize := q.GetRemainingSize()
  816. if remainingSize > 0 && remainingSize < sizeDiff {
  817. return conn.GetQuotaExceededError()
  818. }
  819. }
  820. }
  821. return nil
  822. }
  823. func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64) (io.WriteCloser, int, int64, func(), error) {
  824. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  825. if err != nil {
  826. return nil, 0, 0, nil, err
  827. }
  828. var truncatedSize, fileSize int64
  829. numFiles := 1
  830. isFileOverwrite := false
  831. info, err := fs.Lstat(fsPath)
  832. if err == nil {
  833. fileSize = info.Size()
  834. if info.IsDir() {
  835. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  836. }
  837. if info.Mode().IsRegular() {
  838. isFileOverwrite = true
  839. truncatedSize = fileSize
  840. }
  841. numFiles = 0
  842. }
  843. if err != nil && !fs.IsNotExist(err) {
  844. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  845. }
  846. if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
  847. return nil, numFiles, truncatedSize, nil, err
  848. }
  849. f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1))
  850. if err != nil {
  851. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  852. }
  853. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  854. if isFileOverwrite {
  855. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  856. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  857. truncatedSize = 0
  858. }
  859. }
  860. if cancelFn == nil {
  861. cancelFn = func() {}
  862. }
  863. if f != nil {
  864. return f, numFiles, truncatedSize, cancelFn, nil
  865. }
  866. return w, numFiles, truncatedSize, cancelFn, nil
  867. }
  868. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string) error {
  869. if entryPath == wr.Name {
  870. // skip the archive itself
  871. return nil
  872. }
  873. info, err := conn.DoStat(entryPath, 1, false)
  874. if err != nil {
  875. eventManagerLog(logger.LevelError, "unable to add zip entry %q, stat error: %v", entryPath, err)
  876. return err
  877. }
  878. entryName, err := getZipEntryName(entryPath, baseDir)
  879. if err != nil {
  880. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  881. return err
  882. }
  883. if _, ok := wr.Entries[entryName]; ok {
  884. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  885. return nil
  886. }
  887. wr.Entries[entryName] = true
  888. if info.IsDir() {
  889. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  890. Name: entryName + "/",
  891. Method: zip.Deflate,
  892. Modified: info.ModTime(),
  893. })
  894. if err != nil {
  895. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  896. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  897. }
  898. contents, err := conn.ListDir(entryPath)
  899. if err != nil {
  900. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  901. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  902. }
  903. for _, info := range contents {
  904. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  905. if err := addZipEntry(wr, conn, fullPath, baseDir); err != nil {
  906. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  907. return err
  908. }
  909. }
  910. return nil
  911. }
  912. if !info.Mode().IsRegular() {
  913. // we only allow regular files
  914. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  915. return nil
  916. }
  917. reader, cancelFn, err := getFileReader(conn, entryPath)
  918. if err != nil {
  919. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  920. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  921. }
  922. defer cancelFn()
  923. defer reader.Close()
  924. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  925. Name: entryName,
  926. Method: zip.Deflate,
  927. Modified: info.ModTime(),
  928. })
  929. if err != nil {
  930. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  931. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  932. }
  933. _, err = io.Copy(f, reader)
  934. return err
  935. }
  936. func getZipEntryName(entryPath, baseDir string) (string, error) {
  937. if !strings.HasPrefix(entryPath, baseDir) {
  938. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  939. }
  940. entryPath = strings.TrimPrefix(entryPath, baseDir)
  941. return strings.TrimPrefix(entryPath, "/"), nil
  942. }
  943. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  944. if !conn.User.HasPerm(dataprovider.PermDownload, path.Dir(virtualPath)) {
  945. return nil, nil, conn.GetPermissionDeniedError()
  946. }
  947. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  948. if err != nil {
  949. return nil, nil, err
  950. }
  951. f, r, cancelFn, err := fs.Open(fsPath, 0)
  952. if err != nil {
  953. return nil, nil, conn.GetFsError(fs, err)
  954. }
  955. if cancelFn == nil {
  956. cancelFn = func() {}
  957. }
  958. if f != nil {
  959. return f, cancelFn, nil
  960. }
  961. return r, cancelFn, nil
  962. }
  963. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  964. reader, cancelFn, err := getFileReader(conn, virtualPath)
  965. if err != nil {
  966. return err
  967. }
  968. defer cancelFn()
  969. defer reader.Close()
  970. _, err = io.Copy(w, reader)
  971. return err
  972. }
  973. func getFileContentFn(conn *BaseConnection, virtualPath string, size int64) func(w io.Writer) (int64, error) {
  974. return func(w io.Writer) (int64, error) {
  975. reader, cancelFn, err := getFileReader(conn, virtualPath)
  976. if err != nil {
  977. return 0, err
  978. }
  979. defer cancelFn()
  980. defer reader.Close()
  981. return io.CopyN(w, reader, size)
  982. }
  983. }
  984. func getMailAttachments(conn *BaseConnection, attachments []string, replacer *strings.Replacer) ([]*mail.File, error) {
  985. var files []*mail.File
  986. totalSize := int64(0)
  987. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  988. info, err := conn.DoStat(virtualPath, 0, false)
  989. if err != nil {
  990. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  991. }
  992. if !info.Mode().IsRegular() {
  993. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  994. }
  995. totalSize += info.Size()
  996. if totalSize > maxAttachmentsSize {
  997. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  998. }
  999. files = append(files, &mail.File{
  1000. Name: path.Base(virtualPath),
  1001. Header: make(map[string][]string),
  1002. Writer: getFileContentFn(conn, virtualPath, info.Size()),
  1003. })
  1004. }
  1005. return files, nil
  1006. }
  1007. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  1008. if !strings.Contains(input, "{{") {
  1009. return input
  1010. }
  1011. return replacer.Replace(input)
  1012. }
  1013. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  1014. var matched bool
  1015. var err error
  1016. if strings.Contains(p.Pattern, "**") {
  1017. matched, err = doublestar.Match(p.Pattern, name)
  1018. } else {
  1019. matched, err = path.Match(p.Pattern, name)
  1020. }
  1021. if err != nil {
  1022. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  1023. return false
  1024. }
  1025. if p.InverseMatch {
  1026. return !matched
  1027. }
  1028. return matched
  1029. }
  1030. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  1031. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1032. return false
  1033. }
  1034. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  1035. return false
  1036. }
  1037. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  1038. return false
  1039. }
  1040. return true
  1041. }
  1042. // checkConditionPatterns returns false if patterns are defined and no match is found
  1043. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  1044. if len(patterns) == 0 {
  1045. return true
  1046. }
  1047. for _, p := range patterns {
  1048. if checkEventConditionPattern(p, name) {
  1049. return true
  1050. }
  1051. }
  1052. return false
  1053. }
  1054. func checkEventGroupConditionPatters(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  1055. if len(patterns) == 0 {
  1056. return true
  1057. }
  1058. for _, group := range groups {
  1059. for _, p := range patterns {
  1060. if checkEventConditionPattern(p, group.Name) {
  1061. return true
  1062. }
  1063. }
  1064. }
  1065. return false
  1066. }
  1067. func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  1068. u, err := url.Parse(c.Endpoint)
  1069. if err != nil {
  1070. return "", fmt.Errorf("invalid endpoint: %w", err)
  1071. }
  1072. if strings.Contains(u.Path, "{{") {
  1073. pathComponents := strings.Split(u.Path, "/")
  1074. for idx := range pathComponents {
  1075. part := replaceWithReplacer(pathComponents[idx], replacer)
  1076. if part != pathComponents[idx] {
  1077. pathComponents[idx] = url.PathEscape(part)
  1078. }
  1079. }
  1080. u.Path = ""
  1081. u = u.JoinPath(pathComponents...)
  1082. }
  1083. if len(c.QueryParameters) > 0 {
  1084. q := u.Query()
  1085. for _, keyVal := range c.QueryParameters {
  1086. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1087. }
  1088. u.RawQuery = q.Encode()
  1089. }
  1090. return u.String(), nil
  1091. }
  1092. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  1093. conn *BaseConnection, replacer *strings.Replacer, params *EventParams, addObjectData bool,
  1094. ) error {
  1095. partWriter, err := m.CreatePart(h)
  1096. if err != nil {
  1097. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  1098. return err
  1099. }
  1100. if part.Body != "" {
  1101. cType := h.Get("Content-Type")
  1102. if strings.Contains(strings.ToLower(cType), "application/json") {
  1103. replacements := params.getStringReplacements(addObjectData, true)
  1104. jsonReplacer := strings.NewReplacer(replacements...)
  1105. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, jsonReplacer)))
  1106. } else {
  1107. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, replacer)))
  1108. }
  1109. if err != nil {
  1110. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1111. return err
  1112. }
  1113. return nil
  1114. }
  1115. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  1116. data, err := params.getCompressedDataRetentionReport()
  1117. if err != nil {
  1118. return err
  1119. }
  1120. _, err = partWriter.Write(data)
  1121. if err != nil {
  1122. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1123. return err
  1124. }
  1125. return nil
  1126. }
  1127. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  1128. if err != nil {
  1129. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  1130. return err
  1131. }
  1132. return nil
  1133. }
  1134. func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1135. cancel context.CancelFunc, user dataprovider.User, params *EventParams, addObjectData bool,
  1136. ) (io.ReadCloser, string, error) {
  1137. var body io.ReadCloser
  1138. if c.Method == http.MethodGet {
  1139. return body, "", nil
  1140. }
  1141. if c.Body != "" {
  1142. if c.Body == dataprovider.RetentionReportPlaceHolder {
  1143. data, err := params.getCompressedDataRetentionReport()
  1144. if err != nil {
  1145. return body, "", err
  1146. }
  1147. return io.NopCloser(bytes.NewBuffer(data)), "", nil
  1148. }
  1149. if c.HasJSONBody() {
  1150. replacements := params.getStringReplacements(addObjectData, true)
  1151. jsonReplacer := strings.NewReplacer(replacements...)
  1152. return io.NopCloser(bytes.NewBufferString(replaceWithReplacer(c.Body, jsonReplacer))), "", nil
  1153. }
  1154. return io.NopCloser(bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))), "", nil
  1155. }
  1156. if len(c.Parts) > 0 {
  1157. r, w := io.Pipe()
  1158. m := multipart.NewWriter(w)
  1159. var conn *BaseConnection
  1160. if user.Username != "" {
  1161. var err error
  1162. user, err = getUserForEventAction(user)
  1163. if err != nil {
  1164. return body, "", err
  1165. }
  1166. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1167. err = user.CheckFsRoot(connectionID)
  1168. if err != nil {
  1169. user.CloseFs() //nolint:errcheck
  1170. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  1171. user.Username, err)
  1172. }
  1173. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1174. }
  1175. go func() {
  1176. defer w.Close()
  1177. defer user.CloseFs() //nolint:errcheck
  1178. for _, part := range c.Parts {
  1179. h := make(textproto.MIMEHeader)
  1180. if part.Body != "" {
  1181. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  1182. } else {
  1183. h.Set("Content-Disposition",
  1184. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  1185. multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(part.Filepath))))
  1186. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  1187. if contentType == "" {
  1188. contentType = "application/octet-stream"
  1189. }
  1190. h.Set("Content-Type", contentType)
  1191. }
  1192. for _, keyVal := range part.Headers {
  1193. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1194. }
  1195. if err := writeHTTPPart(m, part, h, conn, replacer, params, addObjectData); err != nil {
  1196. cancel()
  1197. return
  1198. }
  1199. }
  1200. m.Close()
  1201. }()
  1202. return r, m.FormDataContentType(), nil
  1203. }
  1204. return body, "", nil
  1205. }
  1206. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1207. if err := c.TryDecryptPassword(); err != nil {
  1208. return err
  1209. }
  1210. addObjectData := false
  1211. if params.Object != nil {
  1212. addObjectData = c.HasObjectData()
  1213. }
  1214. replacements := params.getStringReplacements(addObjectData, false)
  1215. replacer := strings.NewReplacer(replacements...)
  1216. endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
  1217. if err != nil {
  1218. return err
  1219. }
  1220. ctx, cancel := c.GetContext()
  1221. defer cancel()
  1222. var user dataprovider.User
  1223. if c.HasMultipartFiles() {
  1224. user, err = params.getUserFromSender()
  1225. if err != nil {
  1226. return err
  1227. }
  1228. }
  1229. body, contentType, err := getHTTPRuleActionBody(&c, replacer, cancel, user, params, addObjectData)
  1230. if err != nil {
  1231. return err
  1232. }
  1233. if body != nil {
  1234. defer body.Close()
  1235. }
  1236. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1237. if err != nil {
  1238. return err
  1239. }
  1240. if contentType != "" {
  1241. req.Header.Set("Content-Type", contentType)
  1242. }
  1243. if c.Username != "" {
  1244. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1245. }
  1246. for _, keyVal := range c.Headers {
  1247. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1248. }
  1249. client := c.GetHTTPClient()
  1250. defer client.CloseIdleConnections()
  1251. startTime := time.Now()
  1252. resp, err := client.Do(req)
  1253. if err != nil {
  1254. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1255. endpoint, time.Since(startTime), err)
  1256. return fmt.Errorf("error sending HTTP request: %w", err)
  1257. }
  1258. defer resp.Body.Close()
  1259. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1260. endpoint, time.Since(startTime), resp.StatusCode)
  1261. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1262. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1263. }
  1264. return nil
  1265. }
  1266. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1267. addObjectData := false
  1268. if params.Object != nil {
  1269. for _, k := range c.EnvVars {
  1270. if strings.Contains(k.Value, "{{ObjectData}}") {
  1271. addObjectData = true
  1272. break
  1273. }
  1274. }
  1275. }
  1276. replacements := params.getStringReplacements(addObjectData, false)
  1277. replacer := strings.NewReplacer(replacements...)
  1278. args := make([]string, 0, len(c.Args))
  1279. for _, arg := range c.Args {
  1280. args = append(args, replaceWithReplacer(arg, replacer))
  1281. }
  1282. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1283. defer cancel()
  1284. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1285. cmd.Env = []string{}
  1286. for _, keyVal := range c.EnvVars {
  1287. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1288. }
  1289. startTime := time.Now()
  1290. err := cmd.Run()
  1291. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1292. c.Cmd, time.Since(startTime), err)
  1293. return err
  1294. }
  1295. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1296. addObjectData := false
  1297. if params.Object != nil {
  1298. if strings.Contains(c.Body, "{{ObjectData}}") {
  1299. addObjectData = true
  1300. }
  1301. }
  1302. replacements := params.getStringReplacements(addObjectData, false)
  1303. replacer := strings.NewReplacer(replacements...)
  1304. body := replaceWithReplacer(c.Body, replacer)
  1305. subject := replaceWithReplacer(c.Subject, replacer)
  1306. recipients := make([]string, 0, len(c.Recipients))
  1307. for _, recipient := range c.Recipients {
  1308. recipients = append(recipients, replaceWithReplacer(recipient, replacer))
  1309. }
  1310. startTime := time.Now()
  1311. var files []*mail.File
  1312. fileAttachments := make([]string, 0, len(c.Attachments))
  1313. for _, attachment := range c.Attachments {
  1314. if attachment == dataprovider.RetentionReportPlaceHolder {
  1315. f, err := params.getRetentionReportsAsMailAttachment()
  1316. if err != nil {
  1317. return err
  1318. }
  1319. files = append(files, f)
  1320. continue
  1321. }
  1322. fileAttachments = append(fileAttachments, attachment)
  1323. }
  1324. if len(fileAttachments) > 0 {
  1325. user, err := params.getUserFromSender()
  1326. if err != nil {
  1327. return err
  1328. }
  1329. user, err = getUserForEventAction(user)
  1330. if err != nil {
  1331. return err
  1332. }
  1333. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1334. err = user.CheckFsRoot(connectionID)
  1335. defer user.CloseFs() //nolint:errcheck
  1336. if err != nil {
  1337. return fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  1338. }
  1339. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1340. res, err := getMailAttachments(conn, fileAttachments, replacer)
  1341. if err != nil {
  1342. return err
  1343. }
  1344. files = append(files, res...)
  1345. }
  1346. err := smtp.SendEmail(recipients, subject, body, smtp.EmailContentTypeTextPlain, files...)
  1347. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1348. time.Since(startTime), err)
  1349. if err != nil {
  1350. return fmt.Errorf("unable to send email: %w", err)
  1351. }
  1352. return nil
  1353. }
  1354. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1355. err := user.LoadAndApplyGroupSettings()
  1356. if err != nil {
  1357. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1358. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1359. }
  1360. user.UploadDataTransfer = 0
  1361. user.UploadBandwidth = 0
  1362. user.DownloadBandwidth = 0
  1363. user.Filters.DisableFsChecks = false
  1364. user.Filters.FilePatterns = nil
  1365. user.Filters.BandwidthLimits = nil
  1366. user.Filters.DataTransferLimits = nil
  1367. for k := range user.Permissions {
  1368. user.Permissions[k] = []string{dataprovider.PermAny}
  1369. }
  1370. return user, nil
  1371. }
  1372. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1373. results := make([]string, 0, len(paths))
  1374. for _, p := range paths {
  1375. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1376. }
  1377. return util.RemoveDuplicates(results, false)
  1378. }
  1379. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1380. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1381. if err != nil {
  1382. return err
  1383. }
  1384. return conn.RemoveFile(fs, fsPath, item, info)
  1385. }
  1386. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1387. user, err := getUserForEventAction(user)
  1388. if err != nil {
  1389. return err
  1390. }
  1391. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1392. err = user.CheckFsRoot(connectionID)
  1393. defer user.CloseFs() //nolint:errcheck
  1394. if err != nil {
  1395. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1396. }
  1397. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1398. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1399. info, err := conn.DoStat(item, 0, false)
  1400. if err != nil {
  1401. if conn.IsNotExistError(err) {
  1402. continue
  1403. }
  1404. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1405. }
  1406. if info.IsDir() {
  1407. if err = conn.RemoveDir(item); err != nil {
  1408. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1409. }
  1410. } else {
  1411. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1412. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1413. }
  1414. }
  1415. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1416. }
  1417. return nil
  1418. }
  1419. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1420. conditions dataprovider.ConditionOptions, params *EventParams,
  1421. ) error {
  1422. users, err := params.getUsers()
  1423. if err != nil {
  1424. return fmt.Errorf("unable to get users: %w", err)
  1425. }
  1426. var failures []string
  1427. executed := 0
  1428. for _, user := range users {
  1429. // if sender is set, the conditions have already been evaluated
  1430. if params.sender == "" {
  1431. if !checkUserConditionOptions(&user, &conditions) {
  1432. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1433. user.Username)
  1434. continue
  1435. }
  1436. }
  1437. executed++
  1438. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1439. params.AddError(err)
  1440. failures = append(failures, user.Username)
  1441. }
  1442. }
  1443. if len(failures) > 0 {
  1444. return fmt.Errorf("fs delete failed for users: %s", strings.Join(failures, ", "))
  1445. }
  1446. if executed == 0 {
  1447. eventManagerLog(logger.LevelError, "no delete executed")
  1448. return errors.New("no delete executed")
  1449. }
  1450. return nil
  1451. }
  1452. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1453. user, err := getUserForEventAction(user)
  1454. if err != nil {
  1455. return err
  1456. }
  1457. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1458. err = user.CheckFsRoot(connectionID)
  1459. defer user.CloseFs() //nolint:errcheck
  1460. if err != nil {
  1461. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1462. }
  1463. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1464. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1465. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1466. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1467. }
  1468. if err = conn.createDirIfMissing(item); err != nil {
  1469. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1470. }
  1471. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1472. }
  1473. return nil
  1474. }
  1475. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1476. conditions dataprovider.ConditionOptions, params *EventParams,
  1477. ) error {
  1478. users, err := params.getUsers()
  1479. if err != nil {
  1480. return fmt.Errorf("unable to get users: %w", err)
  1481. }
  1482. var failures []string
  1483. executed := 0
  1484. for _, user := range users {
  1485. // if sender is set, the conditions have already been evaluated
  1486. if params.sender == "" {
  1487. if !checkUserConditionOptions(&user, &conditions) {
  1488. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1489. user.Username)
  1490. continue
  1491. }
  1492. }
  1493. executed++
  1494. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1495. failures = append(failures, user.Username)
  1496. }
  1497. }
  1498. if len(failures) > 0 {
  1499. return fmt.Errorf("fs mkdir failed for users: %s", strings.Join(failures, ", "))
  1500. }
  1501. if executed == 0 {
  1502. eventManagerLog(logger.LevelError, "no mkdir executed")
  1503. return errors.New("no mkdir executed")
  1504. }
  1505. return nil
  1506. }
  1507. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1508. user dataprovider.User,
  1509. ) error {
  1510. user, err := getUserForEventAction(user)
  1511. if err != nil {
  1512. return err
  1513. }
  1514. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1515. err = user.CheckFsRoot(connectionID)
  1516. defer user.CloseFs() //nolint:errcheck
  1517. if err != nil {
  1518. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1519. }
  1520. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1521. for _, item := range renames {
  1522. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1523. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1524. if err = conn.renameInternal(source, target, true); err != nil {
  1525. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1526. }
  1527. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1528. }
  1529. return nil
  1530. }
  1531. func executeCopyFsActionForUser(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1532. user dataprovider.User,
  1533. ) error {
  1534. user, err := getUserForEventAction(user)
  1535. if err != nil {
  1536. return err
  1537. }
  1538. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1539. err = user.CheckFsRoot(connectionID)
  1540. defer user.CloseFs() //nolint:errcheck
  1541. if err != nil {
  1542. return fmt.Errorf("copy error, unable to check root fs for user %q: %w", user.Username, err)
  1543. }
  1544. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1545. for _, item := range copy {
  1546. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1547. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1548. if strings.HasSuffix(item.Key, "/") {
  1549. source += "/"
  1550. }
  1551. if strings.HasSuffix(item.Value, "/") {
  1552. target += "/"
  1553. }
  1554. if err = conn.Copy(source, target); err != nil {
  1555. return fmt.Errorf("unable to copy %q->%q, user %q: %w", source, target, user.Username, err)
  1556. }
  1557. eventManagerLog(logger.LevelDebug, "copy %q->%q ok, user %q", source, target, user.Username)
  1558. }
  1559. return nil
  1560. }
  1561. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1562. user dataprovider.User,
  1563. ) error {
  1564. user, err := getUserForEventAction(user)
  1565. if err != nil {
  1566. return err
  1567. }
  1568. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1569. err = user.CheckFsRoot(connectionID)
  1570. defer user.CloseFs() //nolint:errcheck
  1571. if err != nil {
  1572. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1573. }
  1574. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1575. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1576. if _, err = conn.DoStat(item, 0, false); err != nil {
  1577. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1578. }
  1579. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1580. }
  1581. return nil
  1582. }
  1583. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1584. conditions dataprovider.ConditionOptions, params *EventParams,
  1585. ) error {
  1586. users, err := params.getUsers()
  1587. if err != nil {
  1588. return fmt.Errorf("unable to get users: %w", err)
  1589. }
  1590. var failures []string
  1591. executed := 0
  1592. for _, user := range users {
  1593. // if sender is set, the conditions have already been evaluated
  1594. if params.sender == "" {
  1595. if !checkUserConditionOptions(&user, &conditions) {
  1596. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1597. user.Username)
  1598. continue
  1599. }
  1600. }
  1601. executed++
  1602. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1603. failures = append(failures, user.Username)
  1604. params.AddError(err)
  1605. }
  1606. }
  1607. if len(failures) > 0 {
  1608. return fmt.Errorf("fs rename failed for users: %s", strings.Join(failures, ", "))
  1609. }
  1610. if executed == 0 {
  1611. eventManagerLog(logger.LevelError, "no rename executed")
  1612. return errors.New("no rename executed")
  1613. }
  1614. return nil
  1615. }
  1616. func executeCopyFsRuleAction(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1617. conditions dataprovider.ConditionOptions, params *EventParams,
  1618. ) error {
  1619. users, err := params.getUsers()
  1620. if err != nil {
  1621. return fmt.Errorf("unable to get users: %w", err)
  1622. }
  1623. var failures []string
  1624. var executed int
  1625. for _, user := range users {
  1626. // if sender is set, the conditions have already been evaluated
  1627. if params.sender == "" {
  1628. if !checkUserConditionOptions(&user, &conditions) {
  1629. eventManagerLog(logger.LevelDebug, "skipping fs copy for user %s, condition options don't match",
  1630. user.Username)
  1631. continue
  1632. }
  1633. }
  1634. executed++
  1635. if err = executeCopyFsActionForUser(copy, replacer, user); err != nil {
  1636. failures = append(failures, user.Username)
  1637. params.AddError(err)
  1638. }
  1639. }
  1640. if len(failures) > 0 {
  1641. return fmt.Errorf("fs copy failed for users: %s", strings.Join(failures, ", "))
  1642. }
  1643. if executed == 0 {
  1644. eventManagerLog(logger.LevelError, "no copy executed")
  1645. return errors.New("no copy executed")
  1646. }
  1647. return nil
  1648. }
  1649. func getArchiveBaseDir(paths []string) string {
  1650. var parentDirs []string
  1651. for _, p := range paths {
  1652. parentDirs = append(parentDirs, path.Dir(p))
  1653. }
  1654. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1655. baseDir := "/"
  1656. if len(parentDirs) == 1 {
  1657. baseDir = parentDirs[0]
  1658. }
  1659. return baseDir
  1660. }
  1661. func getSizeForPath(conn *BaseConnection, p string, info os.FileInfo) (int64, error) {
  1662. if info.IsDir() {
  1663. var dirSize int64
  1664. entries, err := conn.ListDir(p)
  1665. if err != nil {
  1666. return 0, err
  1667. }
  1668. for _, entry := range entries {
  1669. size, err := getSizeForPath(conn, path.Join(p, entry.Name()), entry)
  1670. if err != nil {
  1671. return 0, err
  1672. }
  1673. dirSize += size
  1674. }
  1675. return dirSize, nil
  1676. }
  1677. if info.Mode().IsRegular() {
  1678. return info.Size(), nil
  1679. }
  1680. return 0, nil
  1681. }
  1682. func estimateZipSize(conn *BaseConnection, zipPath string, paths []string) (int64, error) {
  1683. q, _ := conn.HasSpace(false, false, zipPath)
  1684. if q.HasSpace && q.GetRemainingSize() > 0 {
  1685. var size int64
  1686. for _, item := range paths {
  1687. info, err := conn.DoStat(item, 1, false)
  1688. if err != nil {
  1689. return size, err
  1690. }
  1691. itemSize, err := getSizeForPath(conn, item, info)
  1692. if err != nil {
  1693. return size, err
  1694. }
  1695. size += itemSize
  1696. }
  1697. eventManagerLog(logger.LevelDebug, "archive paths %v, archive name %q, size: %d", paths, zipPath, size)
  1698. // we assume the zip size will be half of the real size
  1699. return size / 2, nil
  1700. }
  1701. return -1, nil
  1702. }
  1703. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1704. user dataprovider.User,
  1705. ) error {
  1706. user, err := getUserForEventAction(user)
  1707. if err != nil {
  1708. return err
  1709. }
  1710. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1711. err = user.CheckFsRoot(connectionID)
  1712. defer user.CloseFs() //nolint:errcheck
  1713. if err != nil {
  1714. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1715. }
  1716. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1717. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1718. conn.CheckParentDirs(path.Dir(name)) //nolint:errcheck
  1719. paths := make([]string, 0, len(c.Paths))
  1720. for idx := range c.Paths {
  1721. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1722. if p == name {
  1723. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1724. }
  1725. paths = append(paths, p)
  1726. }
  1727. paths = util.RemoveDuplicates(paths, false)
  1728. estimatedSize, err := estimateZipSize(conn, name, paths)
  1729. if err != nil {
  1730. eventManagerLog(logger.LevelError, "unable to estimate size for archive %q: %v", name, err)
  1731. return fmt.Errorf("unable to estimate archive size: %w", err)
  1732. }
  1733. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name, estimatedSize)
  1734. if err != nil {
  1735. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1736. return fmt.Errorf("unable to create archive: %w", err)
  1737. }
  1738. defer cancelFn()
  1739. baseDir := getArchiveBaseDir(paths)
  1740. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1741. zipWriter := &zipWriterWrapper{
  1742. Name: name,
  1743. Writer: zip.NewWriter(writer),
  1744. Entries: make(map[string]bool),
  1745. }
  1746. startTime := time.Now()
  1747. for _, item := range paths {
  1748. if err := addZipEntry(zipWriter, conn, item, baseDir); err != nil {
  1749. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1750. return err
  1751. }
  1752. }
  1753. if err := zipWriter.Writer.Close(); err != nil {
  1754. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1755. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1756. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1757. }
  1758. return closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime)
  1759. }
  1760. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1761. params *EventParams,
  1762. ) error {
  1763. users, err := params.getUsers()
  1764. if err != nil {
  1765. return fmt.Errorf("unable to get users: %w", err)
  1766. }
  1767. var failures []string
  1768. executed := 0
  1769. for _, user := range users {
  1770. // if sender is set, the conditions have already been evaluated
  1771. if params.sender == "" {
  1772. if !checkUserConditionOptions(&user, &conditions) {
  1773. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1774. user.Username)
  1775. continue
  1776. }
  1777. }
  1778. executed++
  1779. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1780. failures = append(failures, user.Username)
  1781. params.AddError(err)
  1782. }
  1783. }
  1784. if len(failures) > 0 {
  1785. return fmt.Errorf("fs existence check failed for users: %s", strings.Join(failures, ", "))
  1786. }
  1787. if executed == 0 {
  1788. eventManagerLog(logger.LevelError, "no existence check executed")
  1789. return errors.New("no existence check executed")
  1790. }
  1791. return nil
  1792. }
  1793. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1794. conditions dataprovider.ConditionOptions, params *EventParams,
  1795. ) error {
  1796. users, err := params.getUsers()
  1797. if err != nil {
  1798. return fmt.Errorf("unable to get users: %w", err)
  1799. }
  1800. var failures []string
  1801. executed := 0
  1802. for _, user := range users {
  1803. // if sender is set, the conditions have already been evaluated
  1804. if params.sender == "" {
  1805. if !checkUserConditionOptions(&user, &conditions) {
  1806. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1807. user.Username)
  1808. continue
  1809. }
  1810. }
  1811. executed++
  1812. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1813. failures = append(failures, user.Username)
  1814. params.AddError(err)
  1815. }
  1816. }
  1817. if len(failures) > 0 {
  1818. return fmt.Errorf("fs compress failed for users: %s", strings.Join(failures, ","))
  1819. }
  1820. if executed == 0 {
  1821. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1822. return errors.New("no file/folder compressed")
  1823. }
  1824. return nil
  1825. }
  1826. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1827. params *EventParams,
  1828. ) error {
  1829. addObjectData := false
  1830. replacements := params.getStringReplacements(addObjectData, false)
  1831. replacer := strings.NewReplacer(replacements...)
  1832. switch c.Type {
  1833. case dataprovider.FilesystemActionRename:
  1834. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1835. case dataprovider.FilesystemActionDelete:
  1836. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1837. case dataprovider.FilesystemActionMkdirs:
  1838. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1839. case dataprovider.FilesystemActionExist:
  1840. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1841. case dataprovider.FilesystemActionCompress:
  1842. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1843. case dataprovider.FilesystemActionCopy:
  1844. return executeCopyFsRuleAction(c.Copy, replacer, conditions, params)
  1845. default:
  1846. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1847. }
  1848. }
  1849. func executeQuotaResetForUser(user *dataprovider.User) error {
  1850. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1851. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1852. user.Username, err)
  1853. return err
  1854. }
  1855. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  1856. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1857. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1858. }
  1859. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1860. numFiles, size, err := user.ScanQuota()
  1861. if err != nil {
  1862. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1863. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1864. }
  1865. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  1866. if err != nil {
  1867. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1868. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1869. }
  1870. return nil
  1871. }
  1872. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1873. users, err := params.getUsers()
  1874. if err != nil {
  1875. return fmt.Errorf("unable to get users: %w", err)
  1876. }
  1877. var failures []string
  1878. executed := 0
  1879. for _, user := range users {
  1880. // if sender is set, the conditions have already been evaluated
  1881. if params.sender == "" {
  1882. if !checkUserConditionOptions(&user, &conditions) {
  1883. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  1884. user.Username)
  1885. continue
  1886. }
  1887. }
  1888. executed++
  1889. if err = executeQuotaResetForUser(&user); err != nil {
  1890. params.AddError(err)
  1891. failures = append(failures, user.Username)
  1892. }
  1893. }
  1894. if len(failures) > 0 {
  1895. return fmt.Errorf("quota reset failed for users: %s", strings.Join(failures, ", "))
  1896. }
  1897. if executed == 0 {
  1898. eventManagerLog(logger.LevelError, "no user quota reset executed")
  1899. return errors.New("no user quota reset executed")
  1900. }
  1901. return nil
  1902. }
  1903. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1904. folders, err := params.getFolders()
  1905. if err != nil {
  1906. return fmt.Errorf("unable to get folders: %w", err)
  1907. }
  1908. var failures []string
  1909. executed := 0
  1910. for _, folder := range folders {
  1911. // if sender is set, the conditions have already been evaluated
  1912. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  1913. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  1914. folder.Name)
  1915. continue
  1916. }
  1917. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  1918. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  1919. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  1920. failures = append(failures, folder.Name)
  1921. continue
  1922. }
  1923. executed++
  1924. f := vfs.VirtualFolder{
  1925. BaseVirtualFolder: folder,
  1926. VirtualPath: "/",
  1927. }
  1928. numFiles, size, err := f.ScanQuota()
  1929. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  1930. if err != nil {
  1931. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  1932. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  1933. failures = append(failures, folder.Name)
  1934. continue
  1935. }
  1936. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  1937. if err != nil {
  1938. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  1939. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  1940. failures = append(failures, folder.Name)
  1941. }
  1942. }
  1943. if len(failures) > 0 {
  1944. return fmt.Errorf("quota reset failed for folders: %s", strings.Join(failures, ", "))
  1945. }
  1946. if executed == 0 {
  1947. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  1948. return errors.New("no folder quota reset executed")
  1949. }
  1950. return nil
  1951. }
  1952. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1953. users, err := params.getUsers()
  1954. if err != nil {
  1955. return fmt.Errorf("unable to get users: %w", err)
  1956. }
  1957. var failures []string
  1958. executed := 0
  1959. for _, user := range users {
  1960. // if sender is set, the conditions have already been evaluated
  1961. if params.sender == "" {
  1962. if !checkUserConditionOptions(&user, &conditions) {
  1963. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  1964. user.Username)
  1965. continue
  1966. }
  1967. }
  1968. executed++
  1969. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  1970. if err != nil {
  1971. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  1972. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  1973. failures = append(failures, user.Username)
  1974. }
  1975. }
  1976. if len(failures) > 0 {
  1977. return fmt.Errorf("transfer quota reset failed for users: %s", strings.Join(failures, ", "))
  1978. }
  1979. if executed == 0 {
  1980. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  1981. return errors.New("no transfer quota reset executed")
  1982. }
  1983. return nil
  1984. }
  1985. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  1986. params *EventParams, actionName string,
  1987. ) error {
  1988. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1989. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  1990. user.Username, err)
  1991. return err
  1992. }
  1993. check := RetentionCheck{
  1994. Folders: folders,
  1995. }
  1996. c := RetentionChecks.Add(check, &user)
  1997. if c == nil {
  1998. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  1999. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  2000. }
  2001. defer func() {
  2002. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  2003. Username: user.Username,
  2004. ActionName: actionName,
  2005. Results: c.results,
  2006. })
  2007. }()
  2008. if err := c.Start(); err != nil {
  2009. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  2010. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  2011. }
  2012. return nil
  2013. }
  2014. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  2015. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  2016. ) error {
  2017. users, err := params.getUsers()
  2018. if err != nil {
  2019. return fmt.Errorf("unable to get users: %w", err)
  2020. }
  2021. var failures []string
  2022. executed := 0
  2023. for _, user := range users {
  2024. // if sender is set, the conditions have already been evaluated
  2025. if params.sender == "" {
  2026. if !checkUserConditionOptions(&user, &conditions) {
  2027. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  2028. user.Username)
  2029. continue
  2030. }
  2031. }
  2032. executed++
  2033. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  2034. failures = append(failures, user.Username)
  2035. params.AddError(err)
  2036. }
  2037. }
  2038. if len(failures) > 0 {
  2039. return fmt.Errorf("retention check failed for users: %s", strings.Join(failures, ", "))
  2040. }
  2041. if executed == 0 {
  2042. eventManagerLog(logger.LevelError, "no retention check executed")
  2043. return errors.New("no retention check executed")
  2044. }
  2045. return nil
  2046. }
  2047. func executeUserExpirationCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2048. users, err := params.getUsers()
  2049. if err != nil {
  2050. return fmt.Errorf("unable to get users: %w", err)
  2051. }
  2052. var failures []string
  2053. var executed int
  2054. for _, user := range users {
  2055. // if sender is set, the conditions have already been evaluated
  2056. if params.sender == "" {
  2057. if !checkUserConditionOptions(&user, &conditions) {
  2058. eventManagerLog(logger.LevelDebug, "skipping expiration check for user %q, condition options don't match",
  2059. user.Username)
  2060. continue
  2061. }
  2062. }
  2063. executed++
  2064. if user.ExpirationDate > 0 {
  2065. expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate)
  2066. if expDate.Before(time.Now()) {
  2067. failures = append(failures, user.Username)
  2068. }
  2069. }
  2070. }
  2071. if len(failures) > 0 {
  2072. return fmt.Errorf("expired users: %s", strings.Join(failures, ", "))
  2073. }
  2074. if executed == 0 {
  2075. eventManagerLog(logger.LevelError, "no user expiration check executed")
  2076. return errors.New("no user expiration check executed")
  2077. }
  2078. return nil
  2079. }
  2080. func executeMetadataCheckForUser(user *dataprovider.User) error {
  2081. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2082. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  2083. user.Username, err)
  2084. return err
  2085. }
  2086. if !ActiveMetadataChecks.Add(user.Username, user.Role) {
  2087. eventManagerLog(logger.LevelError, "another metadata check is already in progress for user %q", user.Username)
  2088. return fmt.Errorf("another metadata check is in progress for user %q", user.Username)
  2089. }
  2090. defer ActiveMetadataChecks.Remove(user.Username)
  2091. if err := user.CheckMetadataConsistency(); err != nil {
  2092. eventManagerLog(logger.LevelError, "error checking metadata consistence for user %q: %v", user.Username, err)
  2093. return fmt.Errorf("error checking metadata consistence for user %q: %w", user.Username, err)
  2094. }
  2095. return nil
  2096. }
  2097. func executeMetadataCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2098. users, err := params.getUsers()
  2099. if err != nil {
  2100. return fmt.Errorf("unable to get users: %w", err)
  2101. }
  2102. var failures []string
  2103. var executed int
  2104. for _, user := range users {
  2105. // if sender is set, the conditions have already been evaluated
  2106. if params.sender == "" {
  2107. if !checkUserConditionOptions(&user, &conditions) {
  2108. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, condition options don't match",
  2109. user.Username)
  2110. continue
  2111. }
  2112. }
  2113. executed++
  2114. if err = executeMetadataCheckForUser(&user); err != nil {
  2115. params.AddError(err)
  2116. failures = append(failures, user.Username)
  2117. }
  2118. }
  2119. if len(failures) > 0 {
  2120. return fmt.Errorf("metadata check failed for users: %s", strings.Join(failures, ", "))
  2121. }
  2122. if executed == 0 {
  2123. eventManagerLog(logger.LevelError, "no metadata check executed")
  2124. return errors.New("no metadata check executed")
  2125. }
  2126. return nil
  2127. }
  2128. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  2129. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2130. eventManagerLog(logger.LevelError, "skipping password expiration check for user %q, cannot apply group settings: %v",
  2131. user.Username, err)
  2132. return err
  2133. }
  2134. if user.ExpirationDate > 0 {
  2135. if expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate); expDate.Before(time.Now()) {
  2136. eventManagerLog(logger.LevelDebug, "skipping password expiration check for expired user %q, expiration date: %s",
  2137. user.Username, expDate)
  2138. return nil
  2139. }
  2140. }
  2141. if user.Filters.PasswordExpiration == 0 {
  2142. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  2143. return nil
  2144. }
  2145. days := user.PasswordExpiresIn()
  2146. if days > config.Threshold {
  2147. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  2148. user.Username, days, config.Threshold)
  2149. return nil
  2150. }
  2151. body := new(bytes.Buffer)
  2152. data := make(map[string]any)
  2153. data["Username"] = user.Username
  2154. data["Days"] = days
  2155. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  2156. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  2157. user.Username, err)
  2158. return err
  2159. }
  2160. subject := "SFTPGo password expiration notification"
  2161. startTime := time.Now()
  2162. if err := smtp.SendEmail([]string{user.Email}, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  2163. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  2164. user.Username, err, time.Since(startTime))
  2165. return err
  2166. }
  2167. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  2168. user.Username, days, time.Since(startTime))
  2169. return nil
  2170. }
  2171. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  2172. params *EventParams) error {
  2173. users, err := params.getUsers()
  2174. if err != nil {
  2175. return fmt.Errorf("unable to get users: %w", err)
  2176. }
  2177. var failures []string
  2178. for _, user := range users {
  2179. // if sender is set, the conditions have already been evaluated
  2180. if params.sender == "" {
  2181. if !checkUserConditionOptions(&user, &conditions) {
  2182. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  2183. user.Username)
  2184. continue
  2185. }
  2186. }
  2187. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  2188. params.AddError(err)
  2189. failures = append(failures, user.Username)
  2190. }
  2191. }
  2192. if len(failures) > 0 {
  2193. return fmt.Errorf("password expiration check failed for users: %s", strings.Join(failures, ", "))
  2194. }
  2195. return nil
  2196. }
  2197. func executeAdminCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.Admin, error) {
  2198. admin, err := dataprovider.AdminExists(params.Name)
  2199. exists := err == nil
  2200. if exists && c.Mode == 1 {
  2201. return &admin, nil
  2202. }
  2203. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2204. return nil, err
  2205. }
  2206. replacements := params.getStringReplacements(false, true)
  2207. replacer := strings.NewReplacer(replacements...)
  2208. data := replaceWithReplacer(c.TemplateAdmin, replacer)
  2209. var newAdmin dataprovider.Admin
  2210. err = json.Unmarshal([]byte(data), &newAdmin)
  2211. if err != nil {
  2212. return nil, err
  2213. }
  2214. if newAdmin.Password == "" {
  2215. newAdmin.Password = util.GenerateUniqueID()
  2216. }
  2217. if exists {
  2218. eventManagerLog(logger.LevelDebug, "updating admin %q after IDP login", params.Name)
  2219. err = dataprovider.UpdateAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2220. } else {
  2221. eventManagerLog(logger.LevelDebug, "creating admin %q after IDP login", params.Name)
  2222. err = dataprovider.AddAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2223. }
  2224. return &newAdmin, err
  2225. }
  2226. func executeUserCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.User, error) {
  2227. user, err := dataprovider.UserExists(params.Name, "")
  2228. exists := err == nil
  2229. if exists && c.Mode == 1 {
  2230. err = user.LoadAndApplyGroupSettings()
  2231. return &user, err
  2232. }
  2233. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2234. return nil, err
  2235. }
  2236. replacements := params.getStringReplacements(false, true)
  2237. replacer := strings.NewReplacer(replacements...)
  2238. data := replaceWithReplacer(c.TemplateUser, replacer)
  2239. var newUser dataprovider.User
  2240. err = json.Unmarshal([]byte(data), &newUser)
  2241. if err != nil {
  2242. return nil, err
  2243. }
  2244. if exists {
  2245. eventManagerLog(logger.LevelDebug, "updating user %q after IDP login", params.Name)
  2246. err = dataprovider.UpdateUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2247. } else {
  2248. eventManagerLog(logger.LevelDebug, "creating user %q after IDP login", params.Name)
  2249. err = dataprovider.AddUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2250. }
  2251. if err != nil {
  2252. return nil, err
  2253. }
  2254. u, err := dataprovider.GetUserWithGroupSettings(params.Name, "")
  2255. return &u, err
  2256. }
  2257. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams,
  2258. conditions dataprovider.ConditionOptions,
  2259. ) error {
  2260. var err error
  2261. switch action.Type {
  2262. case dataprovider.ActionTypeHTTP:
  2263. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  2264. case dataprovider.ActionTypeCommand:
  2265. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  2266. case dataprovider.ActionTypeEmail:
  2267. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  2268. case dataprovider.ActionTypeBackup:
  2269. var backupPath string
  2270. backupPath, err = dataprovider.ExecuteBackup()
  2271. if err == nil {
  2272. params.setBackupParams(backupPath)
  2273. }
  2274. case dataprovider.ActionTypeUserQuotaReset:
  2275. err = executeUsersQuotaResetRuleAction(conditions, params)
  2276. case dataprovider.ActionTypeFolderQuotaReset:
  2277. err = executeFoldersQuotaResetRuleAction(conditions, params)
  2278. case dataprovider.ActionTypeTransferQuotaReset:
  2279. err = executeTransferQuotaResetRuleAction(conditions, params)
  2280. case dataprovider.ActionTypeDataRetentionCheck:
  2281. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  2282. case dataprovider.ActionTypeMetadataCheck:
  2283. err = executeMetadataCheckRuleAction(conditions, params)
  2284. case dataprovider.ActionTypeFilesystem:
  2285. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  2286. case dataprovider.ActionTypePasswordExpirationCheck:
  2287. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  2288. case dataprovider.ActionTypeUserExpirationCheck:
  2289. err = executeUserExpirationCheckRuleAction(conditions, params)
  2290. default:
  2291. err = fmt.Errorf("unsupported action type: %d", action.Type)
  2292. }
  2293. if err != nil {
  2294. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  2295. }
  2296. params.AddError(err)
  2297. return err
  2298. }
  2299. func executeIDPAccountCheckRule(rule dataprovider.EventRule, params EventParams) (*dataprovider.User,
  2300. *dataprovider.Admin, error,
  2301. ) {
  2302. for _, action := range rule.Actions {
  2303. if action.Type == dataprovider.ActionTypeIDPAccountCheck {
  2304. startTime := time.Now()
  2305. var user *dataprovider.User
  2306. var admin *dataprovider.Admin
  2307. var err error
  2308. var failedActions []string
  2309. paramsCopy := params.getACopy()
  2310. switch params.Event {
  2311. case IDPLoginAdmin:
  2312. admin, err = executeAdminCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2313. case IDPLoginUser:
  2314. user, err = executeUserCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2315. default:
  2316. err = fmt.Errorf("unsupported IDP login event: %q", params.Event)
  2317. }
  2318. if err != nil {
  2319. paramsCopy.AddError(fmt.Errorf("unable to handle %q: %w", params.Event, err))
  2320. eventManagerLog(logger.LevelError, "unable to handle IDP login event %q, err: %v", params.Event, err)
  2321. failedActions = append(failedActions, action.Name)
  2322. } else {
  2323. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2324. action.Name, rule.Name, time.Since(startTime))
  2325. }
  2326. // execute async actions if any, including failure actions
  2327. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2328. return user, admin, err
  2329. }
  2330. }
  2331. eventManagerLog(logger.LevelError, "no action executed for IDP login event %q, event rule: %q", params.Event, rule.Name)
  2332. return nil, nil, errors.New("no action executed")
  2333. }
  2334. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  2335. var errRes error
  2336. for _, rule := range rules {
  2337. var failedActions []string
  2338. paramsCopy := params.getACopy()
  2339. for _, action := range rule.Actions {
  2340. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  2341. startTime := time.Now()
  2342. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  2343. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  2344. action.Name, rule.Name, time.Since(startTime), err)
  2345. failedActions = append(failedActions, action.Name)
  2346. // we return the last error, it is ok for now
  2347. errRes = err
  2348. if action.Options.StopOnFailure {
  2349. break
  2350. }
  2351. } else {
  2352. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  2353. action.Name, rule.Name, time.Since(startTime))
  2354. }
  2355. }
  2356. }
  2357. // execute async actions if any, including failure actions
  2358. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2359. }
  2360. return errRes
  2361. }
  2362. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  2363. eventManager.addAsyncTask()
  2364. defer eventManager.removeAsyncTask()
  2365. for _, rule := range rules {
  2366. executeRuleAsyncActions(rule, params.getACopy(), nil)
  2367. }
  2368. }
  2369. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  2370. for _, action := range rule.Actions {
  2371. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  2372. startTime := time.Now()
  2373. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2374. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  2375. action.Name, rule.Name, time.Since(startTime), err)
  2376. failedActions = append(failedActions, action.Name)
  2377. if action.Options.StopOnFailure {
  2378. break
  2379. }
  2380. } else {
  2381. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2382. action.Name, rule.Name, time.Since(startTime))
  2383. }
  2384. }
  2385. }
  2386. if len(failedActions) > 0 {
  2387. params.updateStatusFromError = false
  2388. // execute failure actions
  2389. for _, action := range rule.Actions {
  2390. if action.Options.IsFailureAction {
  2391. startTime := time.Now()
  2392. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2393. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  2394. action.Name, rule.Name, time.Since(startTime), err)
  2395. if action.Options.StopOnFailure {
  2396. break
  2397. }
  2398. } else {
  2399. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  2400. action.Name, rule.Name, time.Since(startTime))
  2401. }
  2402. }
  2403. }
  2404. }
  2405. }
  2406. type eventCronJob struct {
  2407. ruleName string
  2408. }
  2409. func (j *eventCronJob) getTask(rule *dataprovider.EventRule) (dataprovider.Task, error) {
  2410. if rule.GuardFromConcurrentExecution() {
  2411. task, err := dataprovider.GetTaskByName(rule.Name)
  2412. if err != nil {
  2413. if errors.Is(err, util.ErrNotFound) {
  2414. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  2415. task = dataprovider.Task{
  2416. Name: rule.Name,
  2417. UpdateAt: 0,
  2418. Version: 0,
  2419. }
  2420. err = dataprovider.AddTask(rule.Name)
  2421. if err != nil {
  2422. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  2423. return task, err
  2424. }
  2425. } else {
  2426. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  2427. }
  2428. }
  2429. return task, err
  2430. }
  2431. return dataprovider.Task{}, nil
  2432. }
  2433. func (j *eventCronJob) Run() {
  2434. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  2435. rule, err := dataprovider.EventRuleExists(j.ruleName)
  2436. if err != nil {
  2437. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  2438. return
  2439. }
  2440. if err := rule.CheckActionsConsistency(""); err != nil {
  2441. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  2442. return
  2443. }
  2444. task, err := j.getTask(&rule)
  2445. if err != nil {
  2446. return
  2447. }
  2448. if task.Name != "" {
  2449. updateInterval := 5 * time.Minute
  2450. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  2451. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  2452. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  2453. return
  2454. }
  2455. err = dataprovider.UpdateTask(rule.Name, task.Version)
  2456. if err != nil {
  2457. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  2458. rule.Name, err)
  2459. return
  2460. }
  2461. ticker := time.NewTicker(updateInterval)
  2462. done := make(chan bool)
  2463. defer func() {
  2464. done <- true
  2465. ticker.Stop()
  2466. }()
  2467. go func(taskName string) {
  2468. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2469. for {
  2470. select {
  2471. case <-done:
  2472. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2473. return
  2474. case <-ticker.C:
  2475. err := dataprovider.UpdateTaskTimestamp(taskName)
  2476. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2477. }
  2478. }
  2479. }(task.Name)
  2480. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2481. } else {
  2482. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2483. }
  2484. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2485. }
  2486. // RunOnDemandRule executes actions for a rule with on-demand trigger
  2487. func RunOnDemandRule(name string) error {
  2488. eventManagerLog(logger.LevelDebug, "executing on demand rule %q", name)
  2489. rule, err := dataprovider.EventRuleExists(name)
  2490. if err != nil {
  2491. eventManagerLog(logger.LevelDebug, "unable to load rule with name %q", name)
  2492. return util.NewRecordNotFoundError(fmt.Sprintf("rule %q does not exist", name))
  2493. }
  2494. if rule.Trigger != dataprovider.EventTriggerOnDemand {
  2495. eventManagerLog(logger.LevelDebug, "cannot run rule %q as on demand, trigger: %d", name, rule.Trigger)
  2496. return util.NewValidationError(fmt.Sprintf("rule %q is not defined as on-demand", name))
  2497. }
  2498. if rule.Status != 1 {
  2499. eventManagerLog(logger.LevelDebug, "on-demand rule %q is inactive", name)
  2500. return util.NewValidationError(fmt.Sprintf("rule %q is inactive", name))
  2501. }
  2502. if err := rule.CheckActionsConsistency(""); err != nil {
  2503. eventManagerLog(logger.LevelError, "on-demand rule %q has incompatible actions: %v", name, err)
  2504. return util.NewValidationError(fmt.Sprintf("rule %q has incosistent actions", name))
  2505. }
  2506. eventManagerLog(logger.LevelDebug, "on-demand rule %q started", name)
  2507. go executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2508. return nil
  2509. }
  2510. type zipWriterWrapper struct {
  2511. Name string
  2512. Entries map[string]bool
  2513. Writer *zip.Writer
  2514. }
  2515. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2516. logger.Log(level, "eventmanager", "", format, v...)
  2517. }