eventmanager.go 95 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "mime/multipart"
  25. "net/http"
  26. "net/textproto"
  27. "net/url"
  28. "os"
  29. "os/exec"
  30. "path"
  31. "path/filepath"
  32. "slices"
  33. "strconv"
  34. "strings"
  35. "sync"
  36. "sync/atomic"
  37. "time"
  38. "github.com/bmatcuk/doublestar/v4"
  39. "github.com/klauspost/compress/zip"
  40. "github.com/robfig/cron/v3"
  41. "github.com/rs/xid"
  42. "github.com/sftpgo/sdk"
  43. "github.com/wneessen/go-mail"
  44. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  45. "github.com/drakkan/sftpgo/v2/internal/logger"
  46. "github.com/drakkan/sftpgo/v2/internal/plugin"
  47. "github.com/drakkan/sftpgo/v2/internal/smtp"
  48. "github.com/drakkan/sftpgo/v2/internal/util"
  49. "github.com/drakkan/sftpgo/v2/internal/vfs"
  50. )
  51. const (
  52. ipBlockedEventName = "IP Blocked"
  53. maxAttachmentsSize = int64(10 * 1024 * 1024)
  54. objDataPlaceholder = "{{ObjectData}}"
  55. objDataPlaceholderString = "{{ObjectDataString}}"
  56. dateTimeMillisFormat = "2006-01-02T15:04:05.000"
  57. )
  58. // Supported IDP login events
  59. const (
  60. IDPLoginUser = "IDP login user"
  61. IDPLoginAdmin = "IDP login admin"
  62. )
  63. var (
  64. // eventManager handle the supported event rules actions
  65. eventManager eventRulesContainer
  66. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  67. fsEventsWithSize = []string{operationPreDelete, OperationPreUpload, operationDelete,
  68. operationCopy, operationDownload, operationFirstUpload, operationFirstDownload,
  69. operationUpload}
  70. )
  71. func init() {
  72. eventManager = eventRulesContainer{
  73. schedulesMapping: make(map[string][]cron.EntryID),
  74. // arbitrary maximum number of concurrent asynchronous tasks,
  75. // each task could execute multiple actions
  76. concurrencyGuard: make(chan struct{}, 200),
  77. }
  78. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  79. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  80. p := EventParams{
  81. Name: executor,
  82. ObjectName: objectName,
  83. Event: operation,
  84. Status: 1,
  85. ObjectType: objectType,
  86. IP: ip,
  87. Role: role,
  88. Timestamp: time.Now(),
  89. Object: object,
  90. }
  91. if u, ok := object.(*dataprovider.User); ok {
  92. p.Email = u.Email
  93. p.Groups = u.Groups
  94. } else if a, ok := object.(*dataprovider.Admin); ok {
  95. p.Email = a.Email
  96. }
  97. eventManager.handleProviderEvent(p)
  98. })
  99. }
  100. // HandleCertificateEvent checks and executes action rules for certificate events
  101. func HandleCertificateEvent(params EventParams) {
  102. eventManager.handleCertificateEvent(params)
  103. }
  104. // HandleIDPLoginEvent executes actions defined for a successful login from an Identity Provider
  105. func HandleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User, *dataprovider.Admin, error) {
  106. return eventManager.handleIDPLoginEvent(params, customFields)
  107. }
  108. // eventRulesContainer stores event rules by trigger
  109. type eventRulesContainer struct {
  110. sync.RWMutex
  111. lastLoad atomic.Int64
  112. FsEvents []dataprovider.EventRule
  113. ProviderEvents []dataprovider.EventRule
  114. Schedules []dataprovider.EventRule
  115. IPBlockedEvents []dataprovider.EventRule
  116. CertificateEvents []dataprovider.EventRule
  117. IPDLoginEvents []dataprovider.EventRule
  118. schedulesMapping map[string][]cron.EntryID
  119. concurrencyGuard chan struct{}
  120. }
  121. func (r *eventRulesContainer) addAsyncTask() {
  122. activeHooks.Add(1)
  123. r.concurrencyGuard <- struct{}{}
  124. }
  125. func (r *eventRulesContainer) removeAsyncTask() {
  126. activeHooks.Add(-1)
  127. <-r.concurrencyGuard
  128. }
  129. func (r *eventRulesContainer) getLastLoadTime() int64 {
  130. return r.lastLoad.Load()
  131. }
  132. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  133. r.lastLoad.Store(modTime)
  134. }
  135. // RemoveRule deletes the rule with the specified name
  136. func (r *eventRulesContainer) RemoveRule(name string) {
  137. r.Lock()
  138. defer r.Unlock()
  139. r.removeRuleInternal(name)
  140. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  141. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  142. }
  143. func (r *eventRulesContainer) removeRuleInternal(name string) {
  144. for idx := range r.FsEvents {
  145. if r.FsEvents[idx].Name == name {
  146. lastIdx := len(r.FsEvents) - 1
  147. r.FsEvents[idx] = r.FsEvents[lastIdx]
  148. r.FsEvents = r.FsEvents[:lastIdx]
  149. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  150. return
  151. }
  152. }
  153. for idx := range r.ProviderEvents {
  154. if r.ProviderEvents[idx].Name == name {
  155. lastIdx := len(r.ProviderEvents) - 1
  156. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  157. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  158. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  159. return
  160. }
  161. }
  162. for idx := range r.IPBlockedEvents {
  163. if r.IPBlockedEvents[idx].Name == name {
  164. lastIdx := len(r.IPBlockedEvents) - 1
  165. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  166. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  167. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  168. return
  169. }
  170. }
  171. for idx := range r.CertificateEvents {
  172. if r.CertificateEvents[idx].Name == name {
  173. lastIdx := len(r.CertificateEvents) - 1
  174. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  175. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  176. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  177. return
  178. }
  179. }
  180. for idx := range r.IPDLoginEvents {
  181. if r.IPDLoginEvents[idx].Name == name {
  182. lastIdx := len(r.IPDLoginEvents) - 1
  183. r.IPDLoginEvents[idx] = r.IPDLoginEvents[lastIdx]
  184. r.IPDLoginEvents = r.IPDLoginEvents[:lastIdx]
  185. eventManagerLog(logger.LevelDebug, "removed rule %q from IDP login events", name)
  186. return
  187. }
  188. }
  189. for idx := range r.Schedules {
  190. if r.Schedules[idx].Name == name {
  191. if schedules, ok := r.schedulesMapping[name]; ok {
  192. for _, entryID := range schedules {
  193. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  194. eventScheduler.Remove(entryID)
  195. }
  196. delete(r.schedulesMapping, name)
  197. }
  198. lastIdx := len(r.Schedules) - 1
  199. r.Schedules[idx] = r.Schedules[lastIdx]
  200. r.Schedules = r.Schedules[:lastIdx]
  201. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  202. return
  203. }
  204. }
  205. }
  206. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  207. r.removeRuleInternal(rule.Name)
  208. if rule.DeletedAt > 0 {
  209. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  210. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  211. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  212. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  213. }
  214. return
  215. }
  216. if rule.Status != 1 || rule.Trigger == dataprovider.EventTriggerOnDemand {
  217. return
  218. }
  219. switch rule.Trigger {
  220. case dataprovider.EventTriggerFsEvent:
  221. r.FsEvents = append(r.FsEvents, rule)
  222. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  223. case dataprovider.EventTriggerProviderEvent:
  224. r.ProviderEvents = append(r.ProviderEvents, rule)
  225. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  226. case dataprovider.EventTriggerIPBlocked:
  227. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  228. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  229. case dataprovider.EventTriggerCertificate:
  230. r.CertificateEvents = append(r.CertificateEvents, rule)
  231. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  232. case dataprovider.EventTriggerIDPLogin:
  233. r.IPDLoginEvents = append(r.IPDLoginEvents, rule)
  234. eventManagerLog(logger.LevelDebug, "added rule %q to IDP login events", rule.Name)
  235. case dataprovider.EventTriggerSchedule:
  236. for _, schedule := range rule.Conditions.Schedules {
  237. cronSpec := schedule.GetCronSpec()
  238. job := &eventCronJob{
  239. ruleName: dataprovider.ConvertName(rule.Name),
  240. }
  241. entryID, err := eventScheduler.AddJob(cronSpec, job)
  242. if err != nil {
  243. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  244. return
  245. }
  246. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  247. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  248. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  249. }
  250. r.Schedules = append(r.Schedules, rule)
  251. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  252. default:
  253. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  254. }
  255. }
  256. func (r *eventRulesContainer) loadRules() {
  257. eventManagerLog(logger.LevelDebug, "loading updated rules")
  258. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  259. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  260. if err != nil {
  261. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  262. return
  263. }
  264. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  265. if len(rules) > 0 {
  266. r.Lock()
  267. defer r.Unlock()
  268. for _, rule := range rules {
  269. r.addUpdateRuleInternal(rule)
  270. }
  271. }
  272. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d, IDP login events: %d",
  273. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents), len(r.IPDLoginEvents))
  274. r.setLastLoadTime(modTime)
  275. }
  276. func (*eventRulesContainer) checkIPDLoginEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  277. switch conditions.IDPLoginEvent {
  278. case dataprovider.IDPLoginUser:
  279. if params.Event != IDPLoginUser {
  280. return false
  281. }
  282. case dataprovider.IDPLoginAdmin:
  283. if params.Event != IDPLoginAdmin {
  284. return false
  285. }
  286. }
  287. return checkEventConditionPatterns(params.Name, conditions.Options.Names)
  288. }
  289. func (*eventRulesContainer) checkProviderEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  290. if !util.Contains(conditions.ProviderEvents, params.Event) {
  291. return false
  292. }
  293. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  294. return false
  295. }
  296. if !checkEventGroupConditionPatterns(params.Groups, conditions.Options.GroupNames) {
  297. return false
  298. }
  299. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  300. return false
  301. }
  302. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  303. return false
  304. }
  305. return true
  306. }
  307. func (*eventRulesContainer) checkFsEventMatch(conditions *dataprovider.EventConditions, params *EventParams) bool {
  308. if !util.Contains(conditions.FsEvents, params.Event) {
  309. return false
  310. }
  311. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  312. return false
  313. }
  314. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  315. return false
  316. }
  317. if !checkEventGroupConditionPatterns(params.Groups, conditions.Options.GroupNames) {
  318. return false
  319. }
  320. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  321. return false
  322. }
  323. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  324. return false
  325. }
  326. if slices.Contains(fsEventsWithSize, params.Event) {
  327. if conditions.Options.MinFileSize > 0 {
  328. if params.FileSize < conditions.Options.MinFileSize {
  329. return false
  330. }
  331. }
  332. if conditions.Options.MaxFileSize > 0 {
  333. if params.FileSize > conditions.Options.MaxFileSize {
  334. return false
  335. }
  336. }
  337. }
  338. return true
  339. }
  340. // hasFsRules returns true if there are any rules for filesystem event triggers
  341. func (r *eventRulesContainer) hasFsRules() bool {
  342. r.RLock()
  343. defer r.RUnlock()
  344. return len(r.FsEvents) > 0
  345. }
  346. // handleFsEvent executes the rules actions defined for the specified event.
  347. // The boolean parameter indicates whether a sync action was executed
  348. func (r *eventRulesContainer) handleFsEvent(params EventParams) (bool, error) {
  349. if params.Protocol == protocolEventAction {
  350. return false, nil
  351. }
  352. r.RLock()
  353. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  354. for _, rule := range r.FsEvents {
  355. if r.checkFsEventMatch(&rule.Conditions, &params) {
  356. if err := rule.CheckActionsConsistency(""); err != nil {
  357. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  358. rule.Name, err, params.Event)
  359. continue
  360. }
  361. hasSyncActions := false
  362. for _, action := range rule.Actions {
  363. if action.Options.ExecuteSync {
  364. hasSyncActions = true
  365. break
  366. }
  367. }
  368. if hasSyncActions {
  369. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  370. } else {
  371. rulesAsync = append(rulesAsync, rule)
  372. }
  373. }
  374. }
  375. r.RUnlock()
  376. params.sender = params.Name
  377. params.addUID()
  378. if len(rulesAsync) > 0 {
  379. go executeAsyncRulesActions(rulesAsync, params)
  380. }
  381. if len(rulesWithSyncActions) > 0 {
  382. return true, executeSyncRulesActions(rulesWithSyncActions, params)
  383. }
  384. return false, nil
  385. }
  386. func (r *eventRulesContainer) handleIDPLoginEvent(params EventParams, customFields *map[string]any) (*dataprovider.User,
  387. *dataprovider.Admin, error,
  388. ) {
  389. r.RLock()
  390. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  391. for _, rule := range r.IPDLoginEvents {
  392. if r.checkIPDLoginEventMatch(&rule.Conditions, &params) {
  393. if err := rule.CheckActionsConsistency(""); err != nil {
  394. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  395. rule.Name, err, params.Event)
  396. continue
  397. }
  398. hasSyncActions := false
  399. for _, action := range rule.Actions {
  400. if action.Options.ExecuteSync {
  401. hasSyncActions = true
  402. break
  403. }
  404. }
  405. if hasSyncActions {
  406. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  407. } else {
  408. rulesAsync = append(rulesAsync, rule)
  409. }
  410. }
  411. }
  412. r.RUnlock()
  413. if len(rulesAsync) == 0 && len(rulesWithSyncActions) == 0 {
  414. return nil, nil, nil
  415. }
  416. params.addIDPCustomFields(customFields)
  417. if len(rulesWithSyncActions) > 1 {
  418. var ruleNames []string
  419. for _, r := range rulesWithSyncActions {
  420. ruleNames = append(ruleNames, r.Name)
  421. }
  422. return nil, nil, fmt.Errorf("more than one account check action rules matches: %q", strings.Join(ruleNames, ","))
  423. }
  424. params.addUID()
  425. if len(rulesAsync) > 0 {
  426. go executeAsyncRulesActions(rulesAsync, params)
  427. }
  428. if len(rulesWithSyncActions) > 0 {
  429. return executeIDPAccountCheckRule(rulesWithSyncActions[0], params)
  430. }
  431. return nil, nil, nil
  432. }
  433. // username is populated for user objects
  434. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  435. r.RLock()
  436. defer r.RUnlock()
  437. var rules []dataprovider.EventRule
  438. for _, rule := range r.ProviderEvents {
  439. if r.checkProviderEventMatch(&rule.Conditions, &params) {
  440. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  441. rules = append(rules, rule)
  442. } else {
  443. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  444. rule.Name, err, params.Event, params.ObjectType)
  445. }
  446. }
  447. }
  448. if len(rules) > 0 {
  449. params.sender = params.ObjectName
  450. go executeAsyncRulesActions(rules, params)
  451. }
  452. }
  453. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  454. r.RLock()
  455. defer r.RUnlock()
  456. if len(r.IPBlockedEvents) == 0 {
  457. return
  458. }
  459. var rules []dataprovider.EventRule
  460. for _, rule := range r.IPBlockedEvents {
  461. if err := rule.CheckActionsConsistency(""); err == nil {
  462. rules = append(rules, rule)
  463. } else {
  464. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  465. rule.Name, err, params.Event)
  466. }
  467. }
  468. if len(rules) > 0 {
  469. go executeAsyncRulesActions(rules, params)
  470. }
  471. }
  472. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  473. r.RLock()
  474. defer r.RUnlock()
  475. if len(r.CertificateEvents) == 0 {
  476. return
  477. }
  478. var rules []dataprovider.EventRule
  479. for _, rule := range r.CertificateEvents {
  480. if err := rule.CheckActionsConsistency(""); err == nil {
  481. rules = append(rules, rule)
  482. } else {
  483. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  484. rule.Name, err, params.Event)
  485. }
  486. }
  487. if len(rules) > 0 {
  488. go executeAsyncRulesActions(rules, params)
  489. }
  490. }
  491. type executedRetentionCheck struct {
  492. Username string
  493. ActionName string
  494. Results []folderRetentionCheckResult
  495. }
  496. // EventParams defines the supported event parameters
  497. type EventParams struct {
  498. Name string
  499. Groups []sdk.GroupMapping
  500. Event string
  501. Status int
  502. VirtualPath string
  503. FsPath string
  504. VirtualTargetPath string
  505. FsTargetPath string
  506. ObjectName string
  507. Extension string
  508. ObjectType string
  509. FileSize int64
  510. Elapsed int64
  511. Protocol string
  512. IP string
  513. Role string
  514. Email string
  515. Timestamp time.Time
  516. UID string
  517. IDPCustomFields *map[string]string
  518. Object plugin.Renderer
  519. Metadata map[string]string
  520. sender string
  521. updateStatusFromError bool
  522. errors []string
  523. retentionChecks []executedRetentionCheck
  524. }
  525. func (p *EventParams) getACopy() *EventParams {
  526. params := *p
  527. params.errors = make([]string, len(p.errors))
  528. copy(params.errors, p.errors)
  529. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  530. for _, c := range p.retentionChecks {
  531. executedCheck := executedRetentionCheck{
  532. Username: c.Username,
  533. ActionName: c.ActionName,
  534. }
  535. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  536. copy(executedCheck.Results, c.Results)
  537. retentionChecks = append(retentionChecks, executedCheck)
  538. }
  539. params.retentionChecks = retentionChecks
  540. if p.IDPCustomFields != nil {
  541. fields := make(map[string]string)
  542. for k, v := range *p.IDPCustomFields {
  543. fields[k] = v
  544. }
  545. params.IDPCustomFields = &fields
  546. }
  547. if len(params.Metadata) > 0 {
  548. metadata := make(map[string]string)
  549. for k, v := range p.Metadata {
  550. metadata[k] = v
  551. }
  552. params.Metadata = metadata
  553. }
  554. return &params
  555. }
  556. func (p *EventParams) addIDPCustomFields(customFields *map[string]any) {
  557. if customFields == nil || len(*customFields) == 0 {
  558. return
  559. }
  560. fields := make(map[string]string)
  561. for k, v := range *customFields {
  562. switch val := v.(type) {
  563. case string:
  564. fields[k] = val
  565. }
  566. }
  567. p.IDPCustomFields = &fields
  568. }
  569. // AddError adds a new error to the event params and update the status if needed
  570. func (p *EventParams) AddError(err error) {
  571. if err == nil {
  572. return
  573. }
  574. if p.updateStatusFromError && p.Status == 1 {
  575. p.Status = 2
  576. }
  577. p.errors = append(p.errors, err.Error())
  578. }
  579. func (p *EventParams) addUID() {
  580. if p.UID == "" {
  581. p.UID = util.GenerateUniqueID()
  582. }
  583. }
  584. func (p *EventParams) setBackupParams(backupPath string) {
  585. if p.sender != "" {
  586. return
  587. }
  588. p.sender = dataprovider.ActionExecutorSystem
  589. p.FsPath = backupPath
  590. p.ObjectName = filepath.Base(backupPath)
  591. p.VirtualPath = "/" + p.ObjectName
  592. p.Timestamp = time.Now()
  593. info, err := os.Stat(backupPath)
  594. if err == nil {
  595. p.FileSize = info.Size()
  596. }
  597. }
  598. func (p *EventParams) getStatusString() string {
  599. switch p.Status {
  600. case 1:
  601. return "OK"
  602. default:
  603. return "KO"
  604. }
  605. }
  606. // getUsers returns users with group settings not applied
  607. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  608. if p.sender == "" {
  609. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeUsers})
  610. if err != nil {
  611. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  612. return nil, errors.New("unable to get users")
  613. }
  614. return dump.Users, nil
  615. }
  616. user, err := p.getUserFromSender()
  617. if err != nil {
  618. return nil, err
  619. }
  620. return []dataprovider.User{user}, nil
  621. }
  622. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  623. if p.sender == dataprovider.ActionExecutorSystem {
  624. return dataprovider.User{
  625. BaseUser: sdk.BaseUser{
  626. Status: 1,
  627. Username: p.sender,
  628. HomeDir: dataprovider.GetBackupsPath(),
  629. Permissions: map[string][]string{
  630. "/": {dataprovider.PermAny},
  631. },
  632. },
  633. }, nil
  634. }
  635. user, err := dataprovider.UserExists(p.sender, "")
  636. if err != nil {
  637. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  638. return user, fmt.Errorf("error getting user %q", p.sender)
  639. }
  640. return user, nil
  641. }
  642. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  643. if p.sender == "" {
  644. dump, err := dataprovider.DumpData([]string{dataprovider.DumpScopeFolders})
  645. return dump.Folders, err
  646. }
  647. folder, err := dataprovider.GetFolderByName(p.sender)
  648. if err != nil {
  649. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  650. }
  651. return []vfs.BaseVirtualFolder{folder}, nil
  652. }
  653. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  654. if len(p.retentionChecks) == 0 {
  655. return nil, errors.New("no data retention report available")
  656. }
  657. var b bytes.Buffer
  658. if _, err := p.writeCompressedDataRetentionReports(&b); err != nil {
  659. return nil, err
  660. }
  661. return b.Bytes(), nil
  662. }
  663. func (p *EventParams) writeCompressedDataRetentionReports(w io.Writer) (int64, error) {
  664. var n int64
  665. wr := zip.NewWriter(w)
  666. for _, check := range p.retentionChecks {
  667. data, err := getCSVRetentionReport(check.Results)
  668. if err != nil {
  669. return n, fmt.Errorf("unable to get CSV report: %w", err)
  670. }
  671. dataSize := int64(len(data))
  672. n += dataSize
  673. // we suppose a 3:1 compression ratio
  674. if n > (maxAttachmentsSize * 3) {
  675. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s",
  676. util.ByteCountIEC(n))
  677. return n, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(n))
  678. }
  679. fh := &zip.FileHeader{
  680. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  681. Method: zip.Deflate,
  682. Modified: time.Now().UTC(),
  683. }
  684. f, err := wr.CreateHeader(fh)
  685. if err != nil {
  686. return n, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  687. }
  688. _, err = io.CopyN(f, bytes.NewBuffer(data), dataSize)
  689. if err != nil {
  690. return n, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  691. }
  692. }
  693. if err := wr.Close(); err != nil {
  694. return n, fmt.Errorf("unable to close zip writer: %w", err)
  695. }
  696. return n, nil
  697. }
  698. func (p *EventParams) getRetentionReportsAsMailAttachment() (*mail.File, error) {
  699. if len(p.retentionChecks) == 0 {
  700. return nil, errors.New("no data retention report available")
  701. }
  702. return &mail.File{
  703. Name: "retention-reports.zip",
  704. Header: make(map[string][]string),
  705. Writer: p.writeCompressedDataRetentionReports,
  706. }, nil
  707. }
  708. func (*EventParams) getStringReplacement(val string, jsonEscaped bool) string {
  709. if jsonEscaped {
  710. return util.JSONEscape(val)
  711. }
  712. return val
  713. }
  714. func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []string {
  715. dateTimeString := p.Timestamp.UTC().Format(dateTimeMillisFormat)
  716. year := dateTimeString[0:4]
  717. month := dateTimeString[5:7]
  718. day := dateTimeString[8:10]
  719. hour := dateTimeString[11:13]
  720. minute := dateTimeString[14:16]
  721. replacements := []string{
  722. "{{Name}}", p.getStringReplacement(p.Name, jsonEscaped),
  723. "{{Event}}", p.Event,
  724. "{{Status}}", fmt.Sprintf("%d", p.Status),
  725. "{{VirtualPath}}", p.getStringReplacement(p.VirtualPath, jsonEscaped),
  726. "{{EscapedVirtualPath}}", p.getStringReplacement(url.QueryEscape(p.VirtualPath), jsonEscaped),
  727. "{{FsPath}}", p.getStringReplacement(p.FsPath, jsonEscaped),
  728. "{{VirtualTargetPath}}", p.getStringReplacement(p.VirtualTargetPath, jsonEscaped),
  729. "{{FsTargetPath}}", p.getStringReplacement(p.FsTargetPath, jsonEscaped),
  730. "{{ObjectName}}", p.getStringReplacement(p.ObjectName, jsonEscaped),
  731. "{{ObjectType}}", p.ObjectType,
  732. "{{FileSize}}", strconv.FormatInt(p.FileSize, 10),
  733. "{{Elapsed}}", strconv.FormatInt(p.Elapsed, 10),
  734. "{{Protocol}}", p.Protocol,
  735. "{{IP}}", p.IP,
  736. "{{Role}}", p.getStringReplacement(p.Role, jsonEscaped),
  737. "{{Email}}", p.getStringReplacement(p.Email, jsonEscaped),
  738. "{{Timestamp}}", strconv.FormatInt(p.Timestamp.UnixNano(), 10),
  739. "{{DateTime}}", dateTimeString,
  740. "{{Year}}", year,
  741. "{{Month}}", month,
  742. "{{Day}}", day,
  743. "{{Hour}}", hour,
  744. "{{Minute}}", minute,
  745. "{{StatusString}}", p.getStatusString(),
  746. "{{UID}}", p.getStringReplacement(p.UID, jsonEscaped),
  747. "{{Ext}}", p.getStringReplacement(p.Extension, jsonEscaped),
  748. }
  749. if p.VirtualPath != "" {
  750. replacements = append(replacements, "{{VirtualDirPath}}", p.getStringReplacement(path.Dir(p.VirtualPath), jsonEscaped))
  751. }
  752. if p.VirtualTargetPath != "" {
  753. replacements = append(replacements, "{{VirtualTargetDirPath}}", p.getStringReplacement(path.Dir(p.VirtualTargetPath), jsonEscaped))
  754. replacements = append(replacements, "{{TargetName}}", p.getStringReplacement(path.Base(p.VirtualTargetPath), jsonEscaped))
  755. }
  756. if len(p.errors) > 0 {
  757. replacements = append(replacements, "{{ErrorString}}", p.getStringReplacement(strings.Join(p.errors, ", "), jsonEscaped))
  758. } else {
  759. replacements = append(replacements, "{{ErrorString}}", "")
  760. }
  761. replacements = append(replacements, objDataPlaceholder, "{}")
  762. replacements = append(replacements, objDataPlaceholderString, "")
  763. if addObjectData {
  764. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  765. if err == nil {
  766. dataString := util.BytesToString(data)
  767. replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
  768. replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
  769. }
  770. }
  771. if p.IDPCustomFields != nil {
  772. for k, v := range *p.IDPCustomFields {
  773. replacements = append(replacements, fmt.Sprintf("{{IDPField%s}}", k), p.getStringReplacement(v, jsonEscaped))
  774. }
  775. }
  776. replacements = append(replacements, "{{Metadata}}", "{}")
  777. replacements = append(replacements, "{{MetadataString}}", "")
  778. if len(p.Metadata) > 0 {
  779. data, err := json.Marshal(p.Metadata)
  780. if err == nil {
  781. dataString := util.BytesToString(data)
  782. replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
  783. replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
  784. }
  785. }
  786. return replacements
  787. }
  788. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  789. var b bytes.Buffer
  790. csvWriter := csv.NewWriter(&b)
  791. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  792. "elapsed (ms)", "info", "error"})
  793. if err != nil {
  794. return nil, err
  795. }
  796. for _, result := range results {
  797. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  798. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  799. result.Info, result.Error})
  800. if err != nil {
  801. return nil, err
  802. }
  803. }
  804. csvWriter.Flush()
  805. err = csvWriter.Error()
  806. return b.Bytes(), err
  807. }
  808. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSourcePath, virtualTargetPath string,
  809. numFiles int, truncatedSize int64, errTransfer error, operation string, startTime time.Time,
  810. ) error {
  811. var fsDstPath string
  812. var errDstFs error
  813. errWrite := w.Close()
  814. targetPath := virtualSourcePath
  815. if virtualTargetPath != "" {
  816. targetPath = virtualTargetPath
  817. var fsDst vfs.Fs
  818. fsDst, fsDstPath, errDstFs = conn.GetFsAndResolvedPath(virtualTargetPath)
  819. if errTransfer != nil && errDstFs == nil {
  820. // try to remove a partial file on error. If this fails, we can't do anything
  821. errRemove := fsDst.Remove(fsDstPath, false)
  822. conn.Log(logger.LevelDebug, "removing partial file %q after write error, result: %v", virtualTargetPath, errRemove)
  823. }
  824. }
  825. info, err := conn.doStatInternal(targetPath, 0, false, false)
  826. if err == nil {
  827. updateUserQuotaAfterFileWrite(conn, targetPath, numFiles, info.Size()-truncatedSize)
  828. var fsSrcPath string
  829. var errSrcFs error
  830. if virtualSourcePath != "" {
  831. _, fsSrcPath, errSrcFs = conn.GetFsAndResolvedPath(virtualSourcePath)
  832. }
  833. if errSrcFs == nil && errDstFs == nil {
  834. elapsed := time.Since(startTime).Nanoseconds() / 1000000
  835. if errTransfer == nil {
  836. errTransfer = errWrite
  837. }
  838. if operation == operationCopy {
  839. logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
  840. "", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed)
  841. }
  842. ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed, nil) //nolint:errcheck
  843. }
  844. } else {
  845. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
  846. }
  847. if errTransfer != nil {
  848. return errTransfer
  849. }
  850. return errWrite
  851. }
  852. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  853. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  854. if err != nil {
  855. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  856. return
  857. }
  858. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  859. if vfolder.IsIncludedInUserQuota() {
  860. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  861. }
  862. }
  863. func checkWriterPermsAndQuota(conn *BaseConnection, virtualPath string, numFiles int, expectedSize, truncatedSize int64) error {
  864. if numFiles == 0 {
  865. if !conn.User.HasPerm(dataprovider.PermOverwrite, path.Dir(virtualPath)) {
  866. return conn.GetPermissionDeniedError()
  867. }
  868. } else {
  869. if !conn.User.HasPerm(dataprovider.PermUpload, path.Dir(virtualPath)) {
  870. return conn.GetPermissionDeniedError()
  871. }
  872. }
  873. q, _ := conn.HasSpace(numFiles > 0, false, virtualPath)
  874. if !q.HasSpace {
  875. return conn.GetQuotaExceededError()
  876. }
  877. if expectedSize != -1 {
  878. sizeDiff := expectedSize - truncatedSize
  879. if sizeDiff > 0 {
  880. remainingSize := q.GetRemainingSize()
  881. if remainingSize > 0 && remainingSize < sizeDiff {
  882. return conn.GetQuotaExceededError()
  883. }
  884. }
  885. }
  886. return nil
  887. }
  888. func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64) (io.WriteCloser, int, int64, func(), error) {
  889. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  890. if err != nil {
  891. return nil, 0, 0, nil, err
  892. }
  893. var truncatedSize, fileSize int64
  894. numFiles := 1
  895. isFileOverwrite := false
  896. info, err := fs.Lstat(fsPath)
  897. if err == nil {
  898. fileSize = info.Size()
  899. if info.IsDir() {
  900. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  901. }
  902. if info.Mode().IsRegular() {
  903. isFileOverwrite = true
  904. truncatedSize = fileSize
  905. }
  906. numFiles = 0
  907. }
  908. if err != nil && !fs.IsNotExist(err) {
  909. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  910. }
  911. if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
  912. return nil, numFiles, truncatedSize, nil, err
  913. }
  914. f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1, false))
  915. if err != nil {
  916. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  917. }
  918. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  919. if isFileOverwrite {
  920. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  921. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  922. truncatedSize = 0
  923. }
  924. }
  925. if cancelFn == nil {
  926. cancelFn = func() {}
  927. }
  928. if f != nil {
  929. return f, numFiles, truncatedSize, cancelFn, nil
  930. }
  931. return w, numFiles, truncatedSize, cancelFn, nil
  932. }
  933. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string, info os.FileInfo, recursion int) error { //nolint:gocyclo
  934. if entryPath == wr.Name {
  935. // skip the archive itself
  936. return nil
  937. }
  938. if recursion >= util.MaxRecursion {
  939. eventManagerLog(logger.LevelError, "unable to add zip entry %q, recursion too deep: %v", entryPath, recursion)
  940. return util.ErrRecursionTooDeep
  941. }
  942. recursion++
  943. var err error
  944. if info == nil {
  945. info, err = conn.DoStat(entryPath, 1, false)
  946. if err != nil {
  947. eventManagerLog(logger.LevelError, "unable to add zip entry %q, stat error: %v", entryPath, err)
  948. return err
  949. }
  950. }
  951. entryName, err := getZipEntryName(entryPath, baseDir)
  952. if err != nil {
  953. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  954. return err
  955. }
  956. if _, ok := wr.Entries[entryName]; ok {
  957. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  958. return nil
  959. }
  960. wr.Entries[entryName] = true
  961. if info.IsDir() {
  962. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  963. Name: entryName + "/",
  964. Method: zip.Deflate,
  965. Modified: info.ModTime(),
  966. })
  967. if err != nil {
  968. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  969. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  970. }
  971. lister, err := conn.ListDir(entryPath)
  972. if err != nil {
  973. eventManagerLog(logger.LevelError, "unable to add zip entry %q, get dir lister error: %v", entryPath, err)
  974. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  975. }
  976. defer lister.Close()
  977. for {
  978. contents, err := lister.Next(vfs.ListerBatchSize)
  979. finished := errors.Is(err, io.EOF)
  980. if err := lister.convertError(err); err != nil {
  981. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  982. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  983. }
  984. for _, info := range contents {
  985. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  986. if err := addZipEntry(wr, conn, fullPath, baseDir, info, recursion); err != nil {
  987. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  988. return err
  989. }
  990. }
  991. if finished {
  992. return nil
  993. }
  994. }
  995. }
  996. if !info.Mode().IsRegular() {
  997. // we only allow regular files
  998. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  999. return nil
  1000. }
  1001. return addFileToZip(wr, conn, entryPath, entryName, info.ModTime())
  1002. }
  1003. func addFileToZip(wr *zipWriterWrapper, conn *BaseConnection, entryPath, entryName string, modTime time.Time) error {
  1004. reader, cancelFn, err := getFileReader(conn, entryPath)
  1005. if err != nil {
  1006. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  1007. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  1008. }
  1009. defer cancelFn()
  1010. defer reader.Close()
  1011. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  1012. Name: entryName,
  1013. Method: zip.Deflate,
  1014. Modified: modTime,
  1015. })
  1016. if err != nil {
  1017. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  1018. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  1019. }
  1020. _, err = io.Copy(f, reader)
  1021. return err
  1022. }
  1023. func getZipEntryName(entryPath, baseDir string) (string, error) {
  1024. if !strings.HasPrefix(entryPath, baseDir) {
  1025. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  1026. }
  1027. entryPath = strings.TrimPrefix(entryPath, baseDir)
  1028. return strings.TrimPrefix(entryPath, "/"), nil
  1029. }
  1030. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  1031. if !conn.User.HasPerm(dataprovider.PermDownload, path.Dir(virtualPath)) {
  1032. return nil, nil, conn.GetPermissionDeniedError()
  1033. }
  1034. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  1035. if err != nil {
  1036. return nil, nil, err
  1037. }
  1038. f, r, cancelFn, err := fs.Open(fsPath, 0)
  1039. if err != nil {
  1040. return nil, nil, conn.GetFsError(fs, err)
  1041. }
  1042. if cancelFn == nil {
  1043. cancelFn = func() {}
  1044. }
  1045. if f != nil {
  1046. return f, cancelFn, nil
  1047. }
  1048. return r, cancelFn, nil
  1049. }
  1050. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  1051. reader, cancelFn, err := getFileReader(conn, virtualPath)
  1052. if err != nil {
  1053. return err
  1054. }
  1055. defer cancelFn()
  1056. defer reader.Close()
  1057. _, err = io.Copy(w, reader)
  1058. return err
  1059. }
  1060. func getFileContentFn(conn *BaseConnection, virtualPath string, size int64) func(w io.Writer) (int64, error) {
  1061. return func(w io.Writer) (int64, error) {
  1062. reader, cancelFn, err := getFileReader(conn, virtualPath)
  1063. if err != nil {
  1064. return 0, err
  1065. }
  1066. defer cancelFn()
  1067. defer reader.Close()
  1068. return io.CopyN(w, reader, size)
  1069. }
  1070. }
  1071. func getMailAttachments(conn *BaseConnection, attachments []string, replacer *strings.Replacer) ([]*mail.File, error) {
  1072. var files []*mail.File
  1073. totalSize := int64(0)
  1074. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  1075. info, err := conn.DoStat(virtualPath, 0, false)
  1076. if err != nil {
  1077. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  1078. }
  1079. if !info.Mode().IsRegular() {
  1080. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  1081. }
  1082. totalSize += info.Size()
  1083. if totalSize > maxAttachmentsSize {
  1084. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  1085. }
  1086. files = append(files, &mail.File{
  1087. Name: path.Base(virtualPath),
  1088. Header: make(map[string][]string),
  1089. Writer: getFileContentFn(conn, virtualPath, info.Size()),
  1090. })
  1091. }
  1092. return files, nil
  1093. }
  1094. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  1095. if !strings.Contains(input, "{{") {
  1096. return input
  1097. }
  1098. return replacer.Replace(input)
  1099. }
  1100. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  1101. var matched bool
  1102. var err error
  1103. if strings.Contains(p.Pattern, "**") {
  1104. matched, err = doublestar.Match(p.Pattern, name)
  1105. } else {
  1106. matched, err = path.Match(p.Pattern, name)
  1107. }
  1108. if err != nil {
  1109. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  1110. return false
  1111. }
  1112. if p.InverseMatch {
  1113. return !matched
  1114. }
  1115. return matched
  1116. }
  1117. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  1118. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  1119. return false
  1120. }
  1121. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  1122. return false
  1123. }
  1124. if !checkEventGroupConditionPatterns(user.Groups, conditions.GroupNames) {
  1125. return false
  1126. }
  1127. return true
  1128. }
  1129. // checkEventConditionPatterns returns false if patterns are defined and no match is found
  1130. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  1131. if len(patterns) == 0 {
  1132. return true
  1133. }
  1134. matches := false
  1135. for _, p := range patterns {
  1136. // assume, that multiple InverseMatches are set
  1137. if p.InverseMatch {
  1138. if checkEventConditionPattern(p, name) {
  1139. matches = true
  1140. } else {
  1141. return false
  1142. }
  1143. } else if checkEventConditionPattern(p, name) {
  1144. return true
  1145. }
  1146. }
  1147. return matches
  1148. }
  1149. func checkEventGroupConditionPatterns(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  1150. if len(patterns) == 0 {
  1151. return true
  1152. }
  1153. matches := false
  1154. for _, group := range groups {
  1155. for _, p := range patterns {
  1156. // assume, that multiple InverseMatches are set
  1157. if p.InverseMatch {
  1158. if checkEventConditionPattern(p, group.Name) {
  1159. matches = true
  1160. } else {
  1161. return false
  1162. }
  1163. } else {
  1164. if checkEventConditionPattern(p, group.Name) {
  1165. return true
  1166. }
  1167. }
  1168. }
  1169. }
  1170. return matches
  1171. }
  1172. func getHTTPRuleActionEndpoint(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  1173. u, err := url.Parse(c.Endpoint)
  1174. if err != nil {
  1175. return "", fmt.Errorf("invalid endpoint: %w", err)
  1176. }
  1177. if strings.Contains(u.Path, "{{") {
  1178. pathComponents := strings.Split(u.Path, "/")
  1179. for idx := range pathComponents {
  1180. part := replaceWithReplacer(pathComponents[idx], replacer)
  1181. if part != pathComponents[idx] {
  1182. pathComponents[idx] = url.PathEscape(part)
  1183. }
  1184. }
  1185. u.Path = ""
  1186. u = u.JoinPath(pathComponents...)
  1187. }
  1188. if len(c.QueryParameters) > 0 {
  1189. q := u.Query()
  1190. for _, keyVal := range c.QueryParameters {
  1191. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1192. }
  1193. u.RawQuery = q.Encode()
  1194. }
  1195. return u.String(), nil
  1196. }
  1197. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  1198. conn *BaseConnection, replacer *strings.Replacer, params *EventParams, addObjectData bool,
  1199. ) error {
  1200. partWriter, err := m.CreatePart(h)
  1201. if err != nil {
  1202. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  1203. return err
  1204. }
  1205. if part.Body != "" {
  1206. cType := h.Get("Content-Type")
  1207. if strings.Contains(strings.ToLower(cType), "application/json") {
  1208. replacements := params.getStringReplacements(addObjectData, true)
  1209. jsonReplacer := strings.NewReplacer(replacements...)
  1210. _, err = partWriter.Write(util.StringToBytes(replaceWithReplacer(part.Body, jsonReplacer)))
  1211. } else {
  1212. _, err = partWriter.Write(util.StringToBytes(replaceWithReplacer(part.Body, replacer)))
  1213. }
  1214. if err != nil {
  1215. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1216. return err
  1217. }
  1218. return nil
  1219. }
  1220. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  1221. data, err := params.getCompressedDataRetentionReport()
  1222. if err != nil {
  1223. return err
  1224. }
  1225. _, err = partWriter.Write(data)
  1226. if err != nil {
  1227. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  1228. return err
  1229. }
  1230. return nil
  1231. }
  1232. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  1233. if err != nil {
  1234. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  1235. return err
  1236. }
  1237. return nil
  1238. }
  1239. func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer, //nolint:gocyclo
  1240. cancel context.CancelFunc, user dataprovider.User, params *EventParams, addObjectData bool,
  1241. ) (io.Reader, string, error) {
  1242. var body io.Reader
  1243. if c.Method == http.MethodGet {
  1244. return body, "", nil
  1245. }
  1246. if c.Body != "" {
  1247. if c.Body == dataprovider.RetentionReportPlaceHolder {
  1248. data, err := params.getCompressedDataRetentionReport()
  1249. if err != nil {
  1250. return body, "", err
  1251. }
  1252. return bytes.NewBuffer(data), "", nil
  1253. }
  1254. if c.HasJSONBody() {
  1255. replacements := params.getStringReplacements(addObjectData, true)
  1256. jsonReplacer := strings.NewReplacer(replacements...)
  1257. return bytes.NewBufferString(replaceWithReplacer(c.Body, jsonReplacer)), "", nil
  1258. }
  1259. return bytes.NewBufferString(replaceWithReplacer(c.Body, replacer)), "", nil
  1260. }
  1261. if len(c.Parts) > 0 {
  1262. r, w := io.Pipe()
  1263. m := multipart.NewWriter(w)
  1264. var conn *BaseConnection
  1265. if user.Username != "" {
  1266. var err error
  1267. user, err = getUserForEventAction(user)
  1268. if err != nil {
  1269. return body, "", err
  1270. }
  1271. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1272. err = user.CheckFsRoot(connectionID)
  1273. if err != nil {
  1274. user.CloseFs() //nolint:errcheck
  1275. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  1276. user.Username, err)
  1277. }
  1278. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1279. }
  1280. go func() {
  1281. defer w.Close()
  1282. defer user.CloseFs() //nolint:errcheck
  1283. if conn != nil {
  1284. defer conn.CloseFS() //nolint:errcheck
  1285. }
  1286. for _, part := range c.Parts {
  1287. h := make(textproto.MIMEHeader)
  1288. if part.Body != "" {
  1289. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  1290. } else {
  1291. h.Set("Content-Disposition",
  1292. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  1293. multipartQuoteEscaper.Replace(part.Name),
  1294. multipartQuoteEscaper.Replace((path.Base(replaceWithReplacer(part.Filepath, replacer))))))
  1295. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  1296. if contentType == "" {
  1297. contentType = "application/octet-stream"
  1298. }
  1299. h.Set("Content-Type", contentType)
  1300. }
  1301. for _, keyVal := range part.Headers {
  1302. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1303. }
  1304. if err := writeHTTPPart(m, part, h, conn, replacer, params, addObjectData); err != nil {
  1305. cancel()
  1306. return
  1307. }
  1308. }
  1309. m.Close()
  1310. }()
  1311. return r, m.FormDataContentType(), nil
  1312. }
  1313. return body, "", nil
  1314. }
  1315. func setHTTPReqHeaders(req *http.Request, c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1316. contentType string,
  1317. ) {
  1318. if contentType != "" {
  1319. req.Header.Set("Content-Type", contentType)
  1320. }
  1321. if c.Username != "" || c.Password.GetPayload() != "" {
  1322. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1323. }
  1324. for _, keyVal := range c.Headers {
  1325. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1326. }
  1327. }
  1328. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1329. if err := c.TryDecryptPassword(); err != nil {
  1330. return err
  1331. }
  1332. addObjectData := false
  1333. if params.Object != nil {
  1334. addObjectData = c.HasObjectData()
  1335. }
  1336. replacements := params.getStringReplacements(addObjectData, false)
  1337. replacer := strings.NewReplacer(replacements...)
  1338. endpoint, err := getHTTPRuleActionEndpoint(&c, replacer)
  1339. if err != nil {
  1340. return err
  1341. }
  1342. ctx, cancel := c.GetContext()
  1343. defer cancel()
  1344. var user dataprovider.User
  1345. if c.HasMultipartFiles() {
  1346. user, err = params.getUserFromSender()
  1347. if err != nil {
  1348. return err
  1349. }
  1350. }
  1351. body, contentType, err := getHTTPRuleActionBody(&c, replacer, cancel, user, params, addObjectData)
  1352. if err != nil {
  1353. return err
  1354. }
  1355. if body != nil {
  1356. rc, ok := body.(io.ReadCloser)
  1357. if ok {
  1358. defer rc.Close()
  1359. }
  1360. }
  1361. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1362. if err != nil {
  1363. return err
  1364. }
  1365. setHTTPReqHeaders(req, &c, replacer, contentType)
  1366. client := c.GetHTTPClient()
  1367. defer client.CloseIdleConnections()
  1368. startTime := time.Now()
  1369. resp, err := client.Do(req)
  1370. if err != nil {
  1371. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1372. endpoint, time.Since(startTime), err)
  1373. return fmt.Errorf("error sending HTTP request: %w", err)
  1374. }
  1375. defer resp.Body.Close()
  1376. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1377. endpoint, time.Since(startTime), resp.StatusCode)
  1378. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1379. if rb, err := io.ReadAll(io.LimitReader(resp.Body, 2048)); err == nil {
  1380. eventManagerLog(logger.LevelDebug, "error notification response from endpoint %q: %s",
  1381. endpoint, util.BytesToString(rb))
  1382. }
  1383. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1384. }
  1385. return nil
  1386. }
  1387. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1388. if !dataprovider.IsActionCommandAllowed(c.Cmd) {
  1389. return fmt.Errorf("command %q is not allowed", c.Cmd)
  1390. }
  1391. addObjectData := false
  1392. if params.Object != nil {
  1393. for _, k := range c.EnvVars {
  1394. if strings.Contains(k.Value, objDataPlaceholder) || strings.Contains(k.Value, objDataPlaceholderString) {
  1395. addObjectData = true
  1396. break
  1397. }
  1398. }
  1399. }
  1400. replacements := params.getStringReplacements(addObjectData, false)
  1401. replacer := strings.NewReplacer(replacements...)
  1402. args := make([]string, 0, len(c.Args))
  1403. for _, arg := range c.Args {
  1404. args = append(args, replaceWithReplacer(arg, replacer))
  1405. }
  1406. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1407. defer cancel()
  1408. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1409. cmd.Env = []string{}
  1410. for _, keyVal := range c.EnvVars {
  1411. if keyVal.Value == "$" && !strings.HasPrefix(strings.ToUpper(keyVal.Key), "SFTPGO_") {
  1412. val := os.Getenv(keyVal.Key)
  1413. if val == "" {
  1414. eventManagerLog(logger.LevelDebug, "empty value for environment variable %q", keyVal.Key)
  1415. }
  1416. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, val))
  1417. } else {
  1418. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1419. }
  1420. }
  1421. startTime := time.Now()
  1422. err := cmd.Run()
  1423. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1424. c.Cmd, time.Since(startTime), err)
  1425. return err
  1426. }
  1427. func getEmailAddressesWithReplacer(addrs []string, replacer *strings.Replacer) []string {
  1428. if len(addrs) == 0 {
  1429. return nil
  1430. }
  1431. recipients := make([]string, 0, len(addrs))
  1432. for _, recipient := range addrs {
  1433. rcpt := replaceWithReplacer(recipient, replacer)
  1434. if rcpt != "" {
  1435. recipients = append(recipients, rcpt)
  1436. }
  1437. }
  1438. return recipients
  1439. }
  1440. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1441. addObjectData := false
  1442. if params.Object != nil {
  1443. if strings.Contains(c.Body, objDataPlaceholder) || strings.Contains(c.Body, objDataPlaceholderString) {
  1444. addObjectData = true
  1445. }
  1446. }
  1447. replacements := params.getStringReplacements(addObjectData, false)
  1448. replacer := strings.NewReplacer(replacements...)
  1449. body := replaceWithReplacer(c.Body, replacer)
  1450. subject := replaceWithReplacer(c.Subject, replacer)
  1451. recipients := getEmailAddressesWithReplacer(c.Recipients, replacer)
  1452. bcc := getEmailAddressesWithReplacer(c.Bcc, replacer)
  1453. startTime := time.Now()
  1454. var files []*mail.File
  1455. fileAttachments := make([]string, 0, len(c.Attachments))
  1456. for _, attachment := range c.Attachments {
  1457. if attachment == dataprovider.RetentionReportPlaceHolder {
  1458. f, err := params.getRetentionReportsAsMailAttachment()
  1459. if err != nil {
  1460. return err
  1461. }
  1462. files = append(files, f)
  1463. continue
  1464. }
  1465. fileAttachments = append(fileAttachments, attachment)
  1466. }
  1467. if len(fileAttachments) > 0 {
  1468. user, err := params.getUserFromSender()
  1469. if err != nil {
  1470. return err
  1471. }
  1472. user, err = getUserForEventAction(user)
  1473. if err != nil {
  1474. return err
  1475. }
  1476. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1477. err = user.CheckFsRoot(connectionID)
  1478. defer user.CloseFs() //nolint:errcheck
  1479. if err != nil {
  1480. return fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  1481. }
  1482. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1483. defer conn.CloseFS() //nolint:errcheck
  1484. res, err := getMailAttachments(conn, fileAttachments, replacer)
  1485. if err != nil {
  1486. return err
  1487. }
  1488. files = append(files, res...)
  1489. }
  1490. err := smtp.SendEmail(recipients, bcc, subject, body, smtp.EmailContentType(c.ContentType), files...)
  1491. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1492. time.Since(startTime), err)
  1493. if err != nil {
  1494. return fmt.Errorf("unable to send email: %w", err)
  1495. }
  1496. return nil
  1497. }
  1498. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1499. err := user.LoadAndApplyGroupSettings()
  1500. if err != nil {
  1501. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1502. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1503. }
  1504. user.UploadDataTransfer = 0
  1505. user.UploadBandwidth = 0
  1506. user.DownloadBandwidth = 0
  1507. user.Filters.DisableFsChecks = false
  1508. user.Filters.FilePatterns = nil
  1509. user.Filters.BandwidthLimits = nil
  1510. for k := range user.Permissions {
  1511. user.Permissions[k] = []string{dataprovider.PermAny}
  1512. }
  1513. return user, nil
  1514. }
  1515. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1516. results := make([]string, 0, len(paths))
  1517. for _, p := range paths {
  1518. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1519. }
  1520. return util.RemoveDuplicates(results, false)
  1521. }
  1522. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1523. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1524. if err != nil {
  1525. return err
  1526. }
  1527. return conn.RemoveFile(fs, fsPath, item, info)
  1528. }
  1529. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1530. user, err := getUserForEventAction(user)
  1531. if err != nil {
  1532. return err
  1533. }
  1534. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1535. err = user.CheckFsRoot(connectionID)
  1536. defer user.CloseFs() //nolint:errcheck
  1537. if err != nil {
  1538. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1539. }
  1540. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1541. defer conn.CloseFS() //nolint:errcheck
  1542. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1543. info, err := conn.DoStat(item, 0, false)
  1544. if err != nil {
  1545. if conn.IsNotExistError(err) {
  1546. continue
  1547. }
  1548. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1549. }
  1550. if info.IsDir() {
  1551. if err = conn.RemoveDir(item); err != nil {
  1552. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1553. }
  1554. } else {
  1555. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1556. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1557. }
  1558. }
  1559. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1560. }
  1561. return nil
  1562. }
  1563. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1564. conditions dataprovider.ConditionOptions, params *EventParams,
  1565. ) error {
  1566. users, err := params.getUsers()
  1567. if err != nil {
  1568. return fmt.Errorf("unable to get users: %w", err)
  1569. }
  1570. var failures []string
  1571. executed := 0
  1572. for _, user := range users {
  1573. // if sender is set, the conditions have already been evaluated
  1574. if params.sender == "" {
  1575. if !checkUserConditionOptions(&user, &conditions) {
  1576. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1577. user.Username)
  1578. continue
  1579. }
  1580. }
  1581. executed++
  1582. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1583. params.AddError(err)
  1584. failures = append(failures, user.Username)
  1585. }
  1586. }
  1587. if len(failures) > 0 {
  1588. return fmt.Errorf("fs delete failed for users: %s", strings.Join(failures, ", "))
  1589. }
  1590. if executed == 0 {
  1591. eventManagerLog(logger.LevelError, "no delete executed")
  1592. return errors.New("no delete executed")
  1593. }
  1594. return nil
  1595. }
  1596. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1597. user, err := getUserForEventAction(user)
  1598. if err != nil {
  1599. return err
  1600. }
  1601. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1602. err = user.CheckFsRoot(connectionID)
  1603. defer user.CloseFs() //nolint:errcheck
  1604. if err != nil {
  1605. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1606. }
  1607. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1608. defer conn.CloseFS() //nolint:errcheck
  1609. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1610. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1611. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1612. }
  1613. if err = conn.createDirIfMissing(item); err != nil {
  1614. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1615. }
  1616. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1617. }
  1618. return nil
  1619. }
  1620. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1621. conditions dataprovider.ConditionOptions, params *EventParams,
  1622. ) error {
  1623. users, err := params.getUsers()
  1624. if err != nil {
  1625. return fmt.Errorf("unable to get users: %w", err)
  1626. }
  1627. var failures []string
  1628. executed := 0
  1629. for _, user := range users {
  1630. // if sender is set, the conditions have already been evaluated
  1631. if params.sender == "" {
  1632. if !checkUserConditionOptions(&user, &conditions) {
  1633. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1634. user.Username)
  1635. continue
  1636. }
  1637. }
  1638. executed++
  1639. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1640. failures = append(failures, user.Username)
  1641. }
  1642. }
  1643. if len(failures) > 0 {
  1644. return fmt.Errorf("fs mkdir failed for users: %s", strings.Join(failures, ", "))
  1645. }
  1646. if executed == 0 {
  1647. eventManagerLog(logger.LevelError, "no mkdir executed")
  1648. return errors.New("no mkdir executed")
  1649. }
  1650. return nil
  1651. }
  1652. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1653. user dataprovider.User,
  1654. ) error {
  1655. user, err := getUserForEventAction(user)
  1656. if err != nil {
  1657. return err
  1658. }
  1659. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1660. err = user.CheckFsRoot(connectionID)
  1661. defer user.CloseFs() //nolint:errcheck
  1662. if err != nil {
  1663. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1664. }
  1665. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1666. defer conn.CloseFS() //nolint:errcheck
  1667. for _, item := range renames {
  1668. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1669. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1670. if err = conn.renameInternal(source, target, true); err != nil {
  1671. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1672. }
  1673. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1674. }
  1675. return nil
  1676. }
  1677. func executeCopyFsActionForUser(keyVals []dataprovider.KeyValue, replacer *strings.Replacer,
  1678. user dataprovider.User,
  1679. ) error {
  1680. user, err := getUserForEventAction(user)
  1681. if err != nil {
  1682. return err
  1683. }
  1684. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1685. err = user.CheckFsRoot(connectionID)
  1686. defer user.CloseFs() //nolint:errcheck
  1687. if err != nil {
  1688. return fmt.Errorf("copy error, unable to check root fs for user %q: %w", user.Username, err)
  1689. }
  1690. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1691. defer conn.CloseFS() //nolint:errcheck
  1692. for _, item := range keyVals {
  1693. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1694. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1695. if strings.HasSuffix(item.Key, "/") {
  1696. source += "/"
  1697. }
  1698. if strings.HasSuffix(item.Value, "/") {
  1699. target += "/"
  1700. }
  1701. if err = conn.Copy(source, target); err != nil {
  1702. return fmt.Errorf("unable to copy %q->%q, user %q: %w", source, target, user.Username, err)
  1703. }
  1704. eventManagerLog(logger.LevelDebug, "copy %q->%q ok, user %q", source, target, user.Username)
  1705. }
  1706. return nil
  1707. }
  1708. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1709. user dataprovider.User,
  1710. ) error {
  1711. user, err := getUserForEventAction(user)
  1712. if err != nil {
  1713. return err
  1714. }
  1715. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1716. err = user.CheckFsRoot(connectionID)
  1717. defer user.CloseFs() //nolint:errcheck
  1718. if err != nil {
  1719. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1720. }
  1721. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1722. defer conn.CloseFS() //nolint:errcheck
  1723. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1724. if _, err = conn.DoStat(item, 0, false); err != nil {
  1725. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1726. }
  1727. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1728. }
  1729. return nil
  1730. }
  1731. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1732. conditions dataprovider.ConditionOptions, params *EventParams,
  1733. ) error {
  1734. users, err := params.getUsers()
  1735. if err != nil {
  1736. return fmt.Errorf("unable to get users: %w", err)
  1737. }
  1738. var failures []string
  1739. executed := 0
  1740. for _, user := range users {
  1741. // if sender is set, the conditions have already been evaluated
  1742. if params.sender == "" {
  1743. if !checkUserConditionOptions(&user, &conditions) {
  1744. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1745. user.Username)
  1746. continue
  1747. }
  1748. }
  1749. executed++
  1750. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1751. failures = append(failures, user.Username)
  1752. params.AddError(err)
  1753. }
  1754. }
  1755. if len(failures) > 0 {
  1756. return fmt.Errorf("fs rename failed for users: %s", strings.Join(failures, ", "))
  1757. }
  1758. if executed == 0 {
  1759. eventManagerLog(logger.LevelError, "no rename executed")
  1760. return errors.New("no rename executed")
  1761. }
  1762. return nil
  1763. }
  1764. func executeCopyFsRuleAction(keyVals []dataprovider.KeyValue, replacer *strings.Replacer,
  1765. conditions dataprovider.ConditionOptions, params *EventParams,
  1766. ) error {
  1767. users, err := params.getUsers()
  1768. if err != nil {
  1769. return fmt.Errorf("unable to get users: %w", err)
  1770. }
  1771. var failures []string
  1772. var executed int
  1773. for _, user := range users {
  1774. // if sender is set, the conditions have already been evaluated
  1775. if params.sender == "" {
  1776. if !checkUserConditionOptions(&user, &conditions) {
  1777. eventManagerLog(logger.LevelDebug, "skipping fs copy for user %s, condition options don't match",
  1778. user.Username)
  1779. continue
  1780. }
  1781. }
  1782. executed++
  1783. if err = executeCopyFsActionForUser(keyVals, replacer, user); err != nil {
  1784. failures = append(failures, user.Username)
  1785. params.AddError(err)
  1786. }
  1787. }
  1788. if len(failures) > 0 {
  1789. return fmt.Errorf("fs copy failed for users: %s", strings.Join(failures, ", "))
  1790. }
  1791. if executed == 0 {
  1792. eventManagerLog(logger.LevelError, "no copy executed")
  1793. return errors.New("no copy executed")
  1794. }
  1795. return nil
  1796. }
  1797. func getArchiveBaseDir(paths []string) string {
  1798. var parentDirs []string
  1799. for _, p := range paths {
  1800. parentDirs = append(parentDirs, path.Dir(p))
  1801. }
  1802. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1803. baseDir := "/"
  1804. if len(parentDirs) == 1 {
  1805. baseDir = parentDirs[0]
  1806. }
  1807. return baseDir
  1808. }
  1809. func getSizeForPath(conn *BaseConnection, p string, info os.FileInfo) (int64, error) {
  1810. if info.IsDir() {
  1811. var dirSize int64
  1812. lister, err := conn.ListDir(p)
  1813. if err != nil {
  1814. return 0, err
  1815. }
  1816. defer lister.Close()
  1817. for {
  1818. entries, err := lister.Next(vfs.ListerBatchSize)
  1819. finished := errors.Is(err, io.EOF)
  1820. if err != nil && !finished {
  1821. return 0, err
  1822. }
  1823. for _, entry := range entries {
  1824. size, err := getSizeForPath(conn, path.Join(p, entry.Name()), entry)
  1825. if err != nil {
  1826. return 0, err
  1827. }
  1828. dirSize += size
  1829. }
  1830. if finished {
  1831. return dirSize, nil
  1832. }
  1833. }
  1834. }
  1835. if info.Mode().IsRegular() {
  1836. return info.Size(), nil
  1837. }
  1838. return 0, nil
  1839. }
  1840. func estimateZipSize(conn *BaseConnection, zipPath string, paths []string) (int64, error) {
  1841. q, _ := conn.HasSpace(false, false, zipPath)
  1842. if q.HasSpace && q.GetRemainingSize() > 0 {
  1843. var size int64
  1844. for _, item := range paths {
  1845. info, err := conn.DoStat(item, 1, false)
  1846. if err != nil {
  1847. return size, err
  1848. }
  1849. itemSize, err := getSizeForPath(conn, item, info)
  1850. if err != nil {
  1851. return size, err
  1852. }
  1853. size += itemSize
  1854. }
  1855. eventManagerLog(logger.LevelDebug, "archive paths %v, archive name %q, size: %d", paths, zipPath, size)
  1856. // we assume the zip size will be half of the real size
  1857. return size / 2, nil
  1858. }
  1859. return -1, nil
  1860. }
  1861. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1862. user dataprovider.User,
  1863. ) error {
  1864. user, err := getUserForEventAction(user)
  1865. if err != nil {
  1866. return err
  1867. }
  1868. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1869. err = user.CheckFsRoot(connectionID)
  1870. defer user.CloseFs() //nolint:errcheck
  1871. if err != nil {
  1872. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1873. }
  1874. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1875. defer conn.CloseFS() //nolint:errcheck
  1876. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1877. conn.CheckParentDirs(path.Dir(name)) //nolint:errcheck
  1878. paths := make([]string, 0, len(c.Paths))
  1879. for idx := range c.Paths {
  1880. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1881. if p == name {
  1882. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1883. }
  1884. paths = append(paths, p)
  1885. }
  1886. paths = util.RemoveDuplicates(paths, false)
  1887. estimatedSize, err := estimateZipSize(conn, name, paths)
  1888. if err != nil {
  1889. eventManagerLog(logger.LevelError, "unable to estimate size for archive %q: %v", name, err)
  1890. return fmt.Errorf("unable to estimate archive size: %w", err)
  1891. }
  1892. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name, estimatedSize)
  1893. if err != nil {
  1894. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1895. return fmt.Errorf("unable to create archive: %w", err)
  1896. }
  1897. defer cancelFn()
  1898. baseDir := getArchiveBaseDir(paths)
  1899. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1900. zipWriter := &zipWriterWrapper{
  1901. Name: name,
  1902. Writer: zip.NewWriter(writer),
  1903. Entries: make(map[string]bool),
  1904. }
  1905. startTime := time.Now()
  1906. for _, item := range paths {
  1907. if err := addZipEntry(zipWriter, conn, item, baseDir, nil, 0); err != nil {
  1908. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1909. return err
  1910. }
  1911. }
  1912. if err := zipWriter.Writer.Close(); err != nil {
  1913. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1914. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1915. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1916. }
  1917. return closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime)
  1918. }
  1919. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1920. params *EventParams,
  1921. ) error {
  1922. users, err := params.getUsers()
  1923. if err != nil {
  1924. return fmt.Errorf("unable to get users: %w", err)
  1925. }
  1926. var failures []string
  1927. executed := 0
  1928. for _, user := range users {
  1929. // if sender is set, the conditions have already been evaluated
  1930. if params.sender == "" {
  1931. if !checkUserConditionOptions(&user, &conditions) {
  1932. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1933. user.Username)
  1934. continue
  1935. }
  1936. }
  1937. executed++
  1938. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1939. failures = append(failures, user.Username)
  1940. params.AddError(err)
  1941. }
  1942. }
  1943. if len(failures) > 0 {
  1944. return fmt.Errorf("fs existence check failed for users: %s", strings.Join(failures, ", "))
  1945. }
  1946. if executed == 0 {
  1947. eventManagerLog(logger.LevelError, "no existence check executed")
  1948. return errors.New("no existence check executed")
  1949. }
  1950. return nil
  1951. }
  1952. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1953. conditions dataprovider.ConditionOptions, params *EventParams,
  1954. ) error {
  1955. users, err := params.getUsers()
  1956. if err != nil {
  1957. return fmt.Errorf("unable to get users: %w", err)
  1958. }
  1959. var failures []string
  1960. executed := 0
  1961. for _, user := range users {
  1962. // if sender is set, the conditions have already been evaluated
  1963. if params.sender == "" {
  1964. if !checkUserConditionOptions(&user, &conditions) {
  1965. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1966. user.Username)
  1967. continue
  1968. }
  1969. }
  1970. executed++
  1971. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1972. failures = append(failures, user.Username)
  1973. params.AddError(err)
  1974. }
  1975. }
  1976. if len(failures) > 0 {
  1977. return fmt.Errorf("fs compress failed for users: %s", strings.Join(failures, ","))
  1978. }
  1979. if executed == 0 {
  1980. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1981. return errors.New("no file/folder compressed")
  1982. }
  1983. return nil
  1984. }
  1985. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1986. params *EventParams,
  1987. ) error {
  1988. addObjectData := false
  1989. replacements := params.getStringReplacements(addObjectData, false)
  1990. replacer := strings.NewReplacer(replacements...)
  1991. switch c.Type {
  1992. case dataprovider.FilesystemActionRename:
  1993. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1994. case dataprovider.FilesystemActionDelete:
  1995. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1996. case dataprovider.FilesystemActionMkdirs:
  1997. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1998. case dataprovider.FilesystemActionExist:
  1999. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  2000. case dataprovider.FilesystemActionCompress:
  2001. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  2002. case dataprovider.FilesystemActionCopy:
  2003. return executeCopyFsRuleAction(c.Copy, replacer, conditions, params)
  2004. default:
  2005. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  2006. }
  2007. }
  2008. func executeQuotaResetForUser(user *dataprovider.User) error {
  2009. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2010. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  2011. user.Username, err)
  2012. return err
  2013. }
  2014. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  2015. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  2016. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  2017. }
  2018. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  2019. numFiles, size, err := user.ScanQuota()
  2020. if err != nil {
  2021. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  2022. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  2023. }
  2024. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  2025. if err != nil {
  2026. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  2027. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  2028. }
  2029. return nil
  2030. }
  2031. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2032. users, err := params.getUsers()
  2033. if err != nil {
  2034. return fmt.Errorf("unable to get users: %w", err)
  2035. }
  2036. var failures []string
  2037. executed := 0
  2038. for _, user := range users {
  2039. // if sender is set, the conditions have already been evaluated
  2040. if params.sender == "" {
  2041. if !checkUserConditionOptions(&user, &conditions) {
  2042. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  2043. user.Username)
  2044. continue
  2045. }
  2046. }
  2047. executed++
  2048. if err = executeQuotaResetForUser(&user); err != nil {
  2049. params.AddError(err)
  2050. failures = append(failures, user.Username)
  2051. }
  2052. }
  2053. if len(failures) > 0 {
  2054. return fmt.Errorf("quota reset failed for users: %s", strings.Join(failures, ", "))
  2055. }
  2056. if executed == 0 {
  2057. eventManagerLog(logger.LevelError, "no user quota reset executed")
  2058. return errors.New("no user quota reset executed")
  2059. }
  2060. return nil
  2061. }
  2062. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2063. folders, err := params.getFolders()
  2064. if err != nil {
  2065. return fmt.Errorf("unable to get folders: %w", err)
  2066. }
  2067. var failures []string
  2068. executed := 0
  2069. for _, folder := range folders {
  2070. // if sender is set, the conditions have already been evaluated
  2071. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  2072. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  2073. folder.Name)
  2074. continue
  2075. }
  2076. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  2077. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  2078. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  2079. failures = append(failures, folder.Name)
  2080. continue
  2081. }
  2082. executed++
  2083. f := vfs.VirtualFolder{
  2084. BaseVirtualFolder: folder,
  2085. VirtualPath: "/",
  2086. }
  2087. numFiles, size, err := f.ScanQuota()
  2088. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  2089. if err != nil {
  2090. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  2091. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  2092. failures = append(failures, folder.Name)
  2093. continue
  2094. }
  2095. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  2096. if err != nil {
  2097. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  2098. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  2099. failures = append(failures, folder.Name)
  2100. }
  2101. }
  2102. if len(failures) > 0 {
  2103. return fmt.Errorf("quota reset failed for folders: %s", strings.Join(failures, ", "))
  2104. }
  2105. if executed == 0 {
  2106. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  2107. return errors.New("no folder quota reset executed")
  2108. }
  2109. return nil
  2110. }
  2111. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2112. users, err := params.getUsers()
  2113. if err != nil {
  2114. return fmt.Errorf("unable to get users: %w", err)
  2115. }
  2116. var failures []string
  2117. executed := 0
  2118. for _, user := range users {
  2119. // if sender is set, the conditions have already been evaluated
  2120. if params.sender == "" {
  2121. if !checkUserConditionOptions(&user, &conditions) {
  2122. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  2123. user.Username)
  2124. continue
  2125. }
  2126. }
  2127. executed++
  2128. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  2129. if err != nil {
  2130. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  2131. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  2132. failures = append(failures, user.Username)
  2133. }
  2134. }
  2135. if len(failures) > 0 {
  2136. return fmt.Errorf("transfer quota reset failed for users: %s", strings.Join(failures, ", "))
  2137. }
  2138. if executed == 0 {
  2139. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  2140. return errors.New("no transfer quota reset executed")
  2141. }
  2142. return nil
  2143. }
  2144. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  2145. params *EventParams, actionName string,
  2146. ) error {
  2147. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2148. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  2149. user.Username, err)
  2150. return err
  2151. }
  2152. check := RetentionCheck{
  2153. Folders: folders,
  2154. }
  2155. c := RetentionChecks.Add(check, &user)
  2156. if c == nil {
  2157. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  2158. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  2159. }
  2160. defer func() {
  2161. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  2162. Username: user.Username,
  2163. ActionName: actionName,
  2164. Results: c.results,
  2165. })
  2166. }()
  2167. if err := c.Start(); err != nil {
  2168. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  2169. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  2170. }
  2171. return nil
  2172. }
  2173. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  2174. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  2175. ) error {
  2176. users, err := params.getUsers()
  2177. if err != nil {
  2178. return fmt.Errorf("unable to get users: %w", err)
  2179. }
  2180. var failures []string
  2181. executed := 0
  2182. for _, user := range users {
  2183. // if sender is set, the conditions have already been evaluated
  2184. if params.sender == "" {
  2185. if !checkUserConditionOptions(&user, &conditions) {
  2186. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  2187. user.Username)
  2188. continue
  2189. }
  2190. }
  2191. executed++
  2192. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  2193. failures = append(failures, user.Username)
  2194. params.AddError(err)
  2195. }
  2196. }
  2197. if len(failures) > 0 {
  2198. return fmt.Errorf("retention check failed for users: %s", strings.Join(failures, ", "))
  2199. }
  2200. if executed == 0 {
  2201. eventManagerLog(logger.LevelError, "no retention check executed")
  2202. return errors.New("no retention check executed")
  2203. }
  2204. return nil
  2205. }
  2206. func executeUserExpirationCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  2207. users, err := params.getUsers()
  2208. if err != nil {
  2209. return fmt.Errorf("unable to get users: %w", err)
  2210. }
  2211. var failures []string
  2212. var executed int
  2213. for _, user := range users {
  2214. // if sender is set, the conditions have already been evaluated
  2215. if params.sender == "" {
  2216. if !checkUserConditionOptions(&user, &conditions) {
  2217. eventManagerLog(logger.LevelDebug, "skipping expiration check for user %q, condition options don't match",
  2218. user.Username)
  2219. continue
  2220. }
  2221. }
  2222. executed++
  2223. if user.ExpirationDate > 0 {
  2224. expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate)
  2225. if expDate.Before(time.Now()) {
  2226. failures = append(failures, user.Username)
  2227. }
  2228. }
  2229. }
  2230. if len(failures) > 0 {
  2231. return fmt.Errorf("expired users: %s", strings.Join(failures, ", "))
  2232. }
  2233. if executed == 0 {
  2234. eventManagerLog(logger.LevelError, "no user expiration check executed")
  2235. return errors.New("no user expiration check executed")
  2236. }
  2237. return nil
  2238. }
  2239. func executeInactivityCheckForUser(user *dataprovider.User, config dataprovider.EventActionUserInactivity, when time.Time) error {
  2240. if config.DeleteThreshold > 0 && (user.Status == 0 || config.DisableThreshold == 0) {
  2241. if inactivityDays := user.InactivityDays(when); inactivityDays > config.DeleteThreshold {
  2242. err := dataprovider.DeleteUser(user.Username, dataprovider.ActionExecutorSystem, "", "")
  2243. eventManagerLog(logger.LevelInfo, "deleting inactive user %q, days of inactivity: %d/%d, err: %v",
  2244. user.Username, inactivityDays, config.DeleteThreshold, err)
  2245. if err != nil {
  2246. return fmt.Errorf("unable to delete inactive user %q", user.Username)
  2247. }
  2248. return fmt.Errorf("inactive user %q deleted. Number of days of inactivity: %d", user.Username, inactivityDays)
  2249. }
  2250. }
  2251. if config.DisableThreshold > 0 && user.Status > 0 {
  2252. if inactivityDays := user.InactivityDays(when); inactivityDays > config.DisableThreshold {
  2253. user.Status = 0
  2254. err := dataprovider.UpdateUser(user, dataprovider.ActionExecutorSystem, "", "")
  2255. eventManagerLog(logger.LevelInfo, "disabling inactive user %q, days of inactivity: %d/%d, err: %v",
  2256. user.Username, inactivityDays, config.DisableThreshold, err)
  2257. if err != nil {
  2258. return fmt.Errorf("unable to disable inactive user %q", user.Username)
  2259. }
  2260. return fmt.Errorf("inactive user %q disabled. Number of days of inactivity: %d", user.Username, inactivityDays)
  2261. }
  2262. }
  2263. return nil
  2264. }
  2265. func executeUserInactivityCheckRuleAction(config dataprovider.EventActionUserInactivity,
  2266. conditions dataprovider.ConditionOptions,
  2267. params *EventParams,
  2268. when time.Time,
  2269. ) error {
  2270. users, err := params.getUsers()
  2271. if err != nil {
  2272. return fmt.Errorf("unable to get users: %w", err)
  2273. }
  2274. var failures []string
  2275. for _, user := range users {
  2276. // if sender is set, the conditions have already been evaluated
  2277. if params.sender == "" {
  2278. if !checkUserConditionOptions(&user, &conditions) {
  2279. eventManagerLog(logger.LevelDebug, "skipping inactivity check for user %q, condition options don't match",
  2280. user.Username)
  2281. continue
  2282. }
  2283. }
  2284. if err = executeInactivityCheckForUser(&user, config, when); err != nil {
  2285. params.AddError(err)
  2286. failures = append(failures, user.Username)
  2287. }
  2288. }
  2289. if len(failures) > 0 {
  2290. return fmt.Errorf("executed inactivity check actions for users: %s", strings.Join(failures, ", "))
  2291. }
  2292. return nil
  2293. }
  2294. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  2295. if err := user.LoadAndApplyGroupSettings(); err != nil {
  2296. eventManagerLog(logger.LevelError, "skipping password expiration check for user %q, cannot apply group settings: %v",
  2297. user.Username, err)
  2298. return err
  2299. }
  2300. if user.ExpirationDate > 0 {
  2301. if expDate := util.GetTimeFromMsecSinceEpoch(user.ExpirationDate); expDate.Before(time.Now()) {
  2302. eventManagerLog(logger.LevelDebug, "skipping password expiration check for expired user %q, expiration date: %s",
  2303. user.Username, expDate)
  2304. return nil
  2305. }
  2306. }
  2307. if user.Filters.PasswordExpiration == 0 {
  2308. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  2309. return nil
  2310. }
  2311. days := user.PasswordExpiresIn()
  2312. if days > config.Threshold {
  2313. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  2314. user.Username, days, config.Threshold)
  2315. return nil
  2316. }
  2317. body := new(bytes.Buffer)
  2318. data := make(map[string]any)
  2319. data["Username"] = user.Username
  2320. data["Days"] = days
  2321. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  2322. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  2323. user.Username, err)
  2324. return err
  2325. }
  2326. subject := "SFTPGo password expiration notification"
  2327. startTime := time.Now()
  2328. if err := smtp.SendEmail([]string{user.Email}, nil, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  2329. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  2330. user.Username, err, time.Since(startTime))
  2331. return err
  2332. }
  2333. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  2334. user.Username, days, time.Since(startTime))
  2335. return nil
  2336. }
  2337. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  2338. params *EventParams) error {
  2339. users, err := params.getUsers()
  2340. if err != nil {
  2341. return fmt.Errorf("unable to get users: %w", err)
  2342. }
  2343. var failures []string
  2344. for _, user := range users {
  2345. // if sender is set, the conditions have already been evaluated
  2346. if params.sender == "" {
  2347. if !checkUserConditionOptions(&user, &conditions) {
  2348. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  2349. user.Username)
  2350. continue
  2351. }
  2352. }
  2353. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  2354. params.AddError(err)
  2355. failures = append(failures, user.Username)
  2356. }
  2357. }
  2358. if len(failures) > 0 {
  2359. return fmt.Errorf("password expiration check failed for users: %s", strings.Join(failures, ", "))
  2360. }
  2361. return nil
  2362. }
  2363. func executeAdminCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.Admin, error) {
  2364. admin, err := dataprovider.AdminExists(params.Name)
  2365. exists := err == nil
  2366. if exists && c.Mode == 1 {
  2367. return &admin, nil
  2368. }
  2369. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2370. return nil, err
  2371. }
  2372. replacements := params.getStringReplacements(false, true)
  2373. replacer := strings.NewReplacer(replacements...)
  2374. data := replaceWithReplacer(c.TemplateAdmin, replacer)
  2375. var newAdmin dataprovider.Admin
  2376. err = json.Unmarshal(util.StringToBytes(data), &newAdmin)
  2377. if err != nil {
  2378. return nil, err
  2379. }
  2380. if exists {
  2381. eventManagerLog(logger.LevelDebug, "updating admin %q after IDP login", params.Name)
  2382. // Not sure if this makes sense, but it shouldn't hurt.
  2383. if newAdmin.Password == "" {
  2384. newAdmin.Password = admin.Password
  2385. }
  2386. newAdmin.Filters.TOTPConfig = admin.Filters.TOTPConfig
  2387. newAdmin.Filters.RecoveryCodes = admin.Filters.RecoveryCodes
  2388. err = dataprovider.UpdateAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2389. } else {
  2390. eventManagerLog(logger.LevelDebug, "creating admin %q after IDP login", params.Name)
  2391. if newAdmin.Password == "" {
  2392. newAdmin.Password = util.GenerateUniqueID()
  2393. }
  2394. err = dataprovider.AddAdmin(&newAdmin, dataprovider.ActionExecutorSystem, "", "")
  2395. }
  2396. return &newAdmin, err
  2397. }
  2398. func preserveUserProfile(user, newUser *dataprovider.User) {
  2399. if newUser.CanChangePassword() && user.Password != "" {
  2400. newUser.Password = user.Password
  2401. }
  2402. if newUser.CanManagePublicKeys() && len(user.PublicKeys) > 0 {
  2403. newUser.PublicKeys = user.PublicKeys
  2404. }
  2405. if newUser.CanManageTLSCerts() {
  2406. if len(user.Filters.TLSCerts) > 0 {
  2407. newUser.Filters.TLSCerts = user.Filters.TLSCerts
  2408. }
  2409. }
  2410. if newUser.CanChangeInfo() {
  2411. if user.Description != "" {
  2412. newUser.Description = user.Description
  2413. }
  2414. if user.Email != "" {
  2415. newUser.Email = user.Email
  2416. }
  2417. }
  2418. if newUser.CanChangeAPIKeyAuth() {
  2419. newUser.Filters.AllowAPIKeyAuth = user.Filters.AllowAPIKeyAuth
  2420. }
  2421. newUser.Filters.RecoveryCodes = user.Filters.RecoveryCodes
  2422. newUser.Filters.TOTPConfig = user.Filters.TOTPConfig
  2423. newUser.LastPasswordChange = user.LastPasswordChange
  2424. newUser.SetEmptySecretsIfNil()
  2425. }
  2426. func executeUserCheckAction(c *dataprovider.EventActionIDPAccountCheck, params *EventParams) (*dataprovider.User, error) {
  2427. user, err := dataprovider.UserExists(params.Name, "")
  2428. exists := err == nil
  2429. if exists && c.Mode == 1 {
  2430. err = user.LoadAndApplyGroupSettings()
  2431. return &user, err
  2432. }
  2433. if err != nil && !errors.Is(err, util.ErrNotFound) {
  2434. return nil, err
  2435. }
  2436. replacements := params.getStringReplacements(false, true)
  2437. replacer := strings.NewReplacer(replacements...)
  2438. data := replaceWithReplacer(c.TemplateUser, replacer)
  2439. var newUser dataprovider.User
  2440. err = json.Unmarshal(util.StringToBytes(data), &newUser)
  2441. if err != nil {
  2442. return nil, err
  2443. }
  2444. if exists {
  2445. eventManagerLog(logger.LevelDebug, "updating user %q after IDP login", params.Name)
  2446. preserveUserProfile(&user, &newUser)
  2447. err = dataprovider.UpdateUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2448. } else {
  2449. eventManagerLog(logger.LevelDebug, "creating user %q after IDP login", params.Name)
  2450. err = dataprovider.AddUser(&newUser, dataprovider.ActionExecutorSystem, "", "")
  2451. }
  2452. if err != nil {
  2453. return nil, err
  2454. }
  2455. u, err := dataprovider.GetUserWithGroupSettings(params.Name, "")
  2456. return &u, err
  2457. }
  2458. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams, //nolint:gocyclo
  2459. conditions dataprovider.ConditionOptions,
  2460. ) error {
  2461. if len(conditions.EventStatuses) > 0 && !slices.Contains(conditions.EventStatuses, params.Status) {
  2462. eventManagerLog(logger.LevelDebug, "skipping action %s, event status %d does not match: %v",
  2463. action.Name, params.Status, conditions.EventStatuses)
  2464. return nil
  2465. }
  2466. var err error
  2467. switch action.Type {
  2468. case dataprovider.ActionTypeHTTP:
  2469. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  2470. case dataprovider.ActionTypeCommand:
  2471. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  2472. case dataprovider.ActionTypeEmail:
  2473. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  2474. case dataprovider.ActionTypeBackup:
  2475. var backupPath string
  2476. backupPath, err = dataprovider.ExecuteBackup()
  2477. if err == nil {
  2478. params.setBackupParams(backupPath)
  2479. }
  2480. case dataprovider.ActionTypeUserQuotaReset:
  2481. err = executeUsersQuotaResetRuleAction(conditions, params)
  2482. case dataprovider.ActionTypeFolderQuotaReset:
  2483. err = executeFoldersQuotaResetRuleAction(conditions, params)
  2484. case dataprovider.ActionTypeTransferQuotaReset:
  2485. err = executeTransferQuotaResetRuleAction(conditions, params)
  2486. case dataprovider.ActionTypeDataRetentionCheck:
  2487. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  2488. case dataprovider.ActionTypeFilesystem:
  2489. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  2490. case dataprovider.ActionTypePasswordExpirationCheck:
  2491. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  2492. case dataprovider.ActionTypeUserExpirationCheck:
  2493. err = executeUserExpirationCheckRuleAction(conditions, params)
  2494. case dataprovider.ActionTypeUserInactivityCheck:
  2495. err = executeUserInactivityCheckRuleAction(action.Options.UserInactivityConfig, conditions, params, time.Now())
  2496. case dataprovider.ActionTypeRotateLogs:
  2497. err = logger.RotateLogFile()
  2498. default:
  2499. err = fmt.Errorf("unsupported action type: %d", action.Type)
  2500. }
  2501. if err != nil {
  2502. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  2503. }
  2504. params.AddError(err)
  2505. return err
  2506. }
  2507. func executeIDPAccountCheckRule(rule dataprovider.EventRule, params EventParams) (*dataprovider.User,
  2508. *dataprovider.Admin, error,
  2509. ) {
  2510. for _, action := range rule.Actions {
  2511. if action.Type == dataprovider.ActionTypeIDPAccountCheck {
  2512. startTime := time.Now()
  2513. var user *dataprovider.User
  2514. var admin *dataprovider.Admin
  2515. var err error
  2516. var failedActions []string
  2517. paramsCopy := params.getACopy()
  2518. switch params.Event {
  2519. case IDPLoginAdmin:
  2520. admin, err = executeAdminCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2521. case IDPLoginUser:
  2522. user, err = executeUserCheckAction(&action.BaseEventAction.Options.IDPConfig, paramsCopy)
  2523. default:
  2524. err = fmt.Errorf("unsupported IDP login event: %q", params.Event)
  2525. }
  2526. if err != nil {
  2527. paramsCopy.AddError(fmt.Errorf("unable to handle %q: %w", params.Event, err))
  2528. eventManagerLog(logger.LevelError, "unable to handle IDP login event %q, err: %v", params.Event, err)
  2529. failedActions = append(failedActions, action.Name)
  2530. } else {
  2531. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2532. action.Name, rule.Name, time.Since(startTime))
  2533. }
  2534. // execute async actions if any, including failure actions
  2535. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2536. return user, admin, err
  2537. }
  2538. }
  2539. eventManagerLog(logger.LevelError, "no action executed for IDP login event %q, event rule: %q", params.Event, rule.Name)
  2540. return nil, nil, errors.New("no action executed")
  2541. }
  2542. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  2543. var errRes error
  2544. for _, rule := range rules {
  2545. var failedActions []string
  2546. paramsCopy := params.getACopy()
  2547. for _, action := range rule.Actions {
  2548. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  2549. startTime := time.Now()
  2550. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  2551. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  2552. action.Name, rule.Name, time.Since(startTime), err)
  2553. failedActions = append(failedActions, action.Name)
  2554. // we return the last error, it is ok for now
  2555. errRes = err
  2556. if action.Options.StopOnFailure {
  2557. break
  2558. }
  2559. } else {
  2560. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  2561. action.Name, rule.Name, time.Since(startTime))
  2562. }
  2563. }
  2564. }
  2565. // execute async actions if any, including failure actions
  2566. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2567. }
  2568. return errRes
  2569. }
  2570. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  2571. eventManager.addAsyncTask()
  2572. defer eventManager.removeAsyncTask()
  2573. params.addUID()
  2574. for _, rule := range rules {
  2575. executeRuleAsyncActions(rule, params.getACopy(), nil)
  2576. }
  2577. }
  2578. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  2579. for _, action := range rule.Actions {
  2580. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  2581. startTime := time.Now()
  2582. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2583. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  2584. action.Name, rule.Name, time.Since(startTime), err)
  2585. failedActions = append(failedActions, action.Name)
  2586. if action.Options.StopOnFailure {
  2587. break
  2588. }
  2589. } else {
  2590. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2591. action.Name, rule.Name, time.Since(startTime))
  2592. }
  2593. }
  2594. }
  2595. if len(failedActions) > 0 {
  2596. params.updateStatusFromError = false
  2597. // execute failure actions
  2598. for _, action := range rule.Actions {
  2599. if action.Options.IsFailureAction {
  2600. startTime := time.Now()
  2601. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2602. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  2603. action.Name, rule.Name, time.Since(startTime), err)
  2604. if action.Options.StopOnFailure {
  2605. break
  2606. }
  2607. } else {
  2608. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  2609. action.Name, rule.Name, time.Since(startTime))
  2610. }
  2611. }
  2612. }
  2613. }
  2614. }
  2615. type eventCronJob struct {
  2616. ruleName string
  2617. }
  2618. func (j *eventCronJob) getTask(rule *dataprovider.EventRule) (dataprovider.Task, error) {
  2619. if rule.GuardFromConcurrentExecution() {
  2620. task, err := dataprovider.GetTaskByName(rule.Name)
  2621. if err != nil {
  2622. if errors.Is(err, util.ErrNotFound) {
  2623. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  2624. task = dataprovider.Task{
  2625. Name: rule.Name,
  2626. UpdateAt: 0,
  2627. Version: 0,
  2628. }
  2629. err = dataprovider.AddTask(rule.Name)
  2630. if err != nil {
  2631. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  2632. return task, err
  2633. }
  2634. } else {
  2635. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  2636. }
  2637. }
  2638. return task, err
  2639. }
  2640. return dataprovider.Task{}, nil
  2641. }
  2642. func (j *eventCronJob) Run() {
  2643. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  2644. rule, err := dataprovider.EventRuleExists(j.ruleName)
  2645. if err != nil {
  2646. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  2647. return
  2648. }
  2649. if err := rule.CheckActionsConsistency(""); err != nil {
  2650. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  2651. return
  2652. }
  2653. task, err := j.getTask(&rule)
  2654. if err != nil {
  2655. return
  2656. }
  2657. if task.Name != "" {
  2658. updateInterval := 5 * time.Minute
  2659. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  2660. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  2661. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  2662. return
  2663. }
  2664. err = dataprovider.UpdateTask(rule.Name, task.Version)
  2665. if err != nil {
  2666. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  2667. rule.Name, err)
  2668. return
  2669. }
  2670. ticker := time.NewTicker(updateInterval)
  2671. done := make(chan bool)
  2672. defer func() {
  2673. done <- true
  2674. ticker.Stop()
  2675. }()
  2676. go func(taskName string) {
  2677. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2678. for {
  2679. select {
  2680. case <-done:
  2681. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2682. return
  2683. case <-ticker.C:
  2684. err := dataprovider.UpdateTaskTimestamp(taskName)
  2685. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2686. }
  2687. }
  2688. }(task.Name)
  2689. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2690. } else {
  2691. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2692. }
  2693. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2694. }
  2695. // RunOnDemandRule executes actions for a rule with on-demand trigger
  2696. func RunOnDemandRule(name string) error {
  2697. eventManagerLog(logger.LevelDebug, "executing on demand rule %q", name)
  2698. rule, err := dataprovider.EventRuleExists(name)
  2699. if err != nil {
  2700. eventManagerLog(logger.LevelDebug, "unable to load rule with name %q", name)
  2701. return util.NewRecordNotFoundError(fmt.Sprintf("rule %q does not exist", name))
  2702. }
  2703. if rule.Trigger != dataprovider.EventTriggerOnDemand {
  2704. eventManagerLog(logger.LevelDebug, "cannot run rule %q as on demand, trigger: %d", name, rule.Trigger)
  2705. return util.NewValidationError(fmt.Sprintf("rule %q is not defined as on-demand", name))
  2706. }
  2707. if rule.Status != 1 {
  2708. eventManagerLog(logger.LevelDebug, "on-demand rule %q is inactive", name)
  2709. return util.NewValidationError(fmt.Sprintf("rule %q is inactive", name))
  2710. }
  2711. if err := rule.CheckActionsConsistency(""); err != nil {
  2712. eventManagerLog(logger.LevelError, "on-demand rule %q has incompatible actions: %v", name, err)
  2713. return util.NewValidationError(fmt.Sprintf("rule %q has incosistent actions", name))
  2714. }
  2715. eventManagerLog(logger.LevelDebug, "on-demand rule %q started", name)
  2716. go executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2717. return nil
  2718. }
  2719. type zipWriterWrapper struct {
  2720. Name string
  2721. Entries map[string]bool
  2722. Writer *zip.Writer
  2723. }
  2724. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2725. logger.Log(level, "eventmanager", "", format, v...)
  2726. }