notifier.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package plugin
  15. import (
  16. "fmt"
  17. "sync"
  18. "time"
  19. "github.com/hashicorp/go-hclog"
  20. "github.com/hashicorp/go-plugin"
  21. "github.com/sftpgo/sdk/plugin/notifier"
  22. "github.com/drakkan/sftpgo/v2/internal/logger"
  23. "github.com/drakkan/sftpgo/v2/internal/util"
  24. )
  25. // NotifierConfig defines configuration parameters for notifiers plugins
  26. type NotifierConfig struct {
  27. FsEvents []string `json:"fs_events" mapstructure:"fs_events"`
  28. ProviderEvents []string `json:"provider_events" mapstructure:"provider_events"`
  29. ProviderObjects []string `json:"provider_objects" mapstructure:"provider_objects"`
  30. LogEvents []int `json:"log_events" mapstructure:"log_events"`
  31. RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"`
  32. RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
  33. }
  34. func (c *NotifierConfig) hasActions() bool {
  35. if len(c.FsEvents) > 0 {
  36. return true
  37. }
  38. if len(c.ProviderEvents) > 0 && len(c.ProviderObjects) > 0 {
  39. return true
  40. }
  41. if len(c.LogEvents) > 0 {
  42. return true
  43. }
  44. return false
  45. }
  46. type eventsQueue struct {
  47. sync.RWMutex
  48. fsEvents []*notifier.FsEvent
  49. providerEvents []*notifier.ProviderEvent
  50. logEvents []*notifier.LogEvent
  51. }
  52. func (q *eventsQueue) addFsEvent(event *notifier.FsEvent) {
  53. q.Lock()
  54. defer q.Unlock()
  55. q.fsEvents = append(q.fsEvents, event)
  56. }
  57. func (q *eventsQueue) addProviderEvent(event *notifier.ProviderEvent) {
  58. q.Lock()
  59. defer q.Unlock()
  60. q.providerEvents = append(q.providerEvents, event)
  61. }
  62. func (q *eventsQueue) addLogEvent(event *notifier.LogEvent) {
  63. q.Lock()
  64. defer q.Unlock()
  65. q.logEvents = append(q.logEvents, event)
  66. }
  67. func (q *eventsQueue) popFsEvent() *notifier.FsEvent {
  68. q.Lock()
  69. defer q.Unlock()
  70. if len(q.fsEvents) == 0 {
  71. return nil
  72. }
  73. truncLen := len(q.fsEvents) - 1
  74. ev := q.fsEvents[truncLen]
  75. q.fsEvents[truncLen] = nil
  76. q.fsEvents = q.fsEvents[:truncLen]
  77. return ev
  78. }
  79. func (q *eventsQueue) popProviderEvent() *notifier.ProviderEvent {
  80. q.Lock()
  81. defer q.Unlock()
  82. if len(q.providerEvents) == 0 {
  83. return nil
  84. }
  85. truncLen := len(q.providerEvents) - 1
  86. ev := q.providerEvents[truncLen]
  87. q.providerEvents[truncLen] = nil
  88. q.providerEvents = q.providerEvents[:truncLen]
  89. return ev
  90. }
  91. func (q *eventsQueue) popLogEvent() *notifier.LogEvent {
  92. q.Lock()
  93. defer q.Unlock()
  94. if len(q.logEvents) == 0 {
  95. return nil
  96. }
  97. truncLen := len(q.logEvents) - 1
  98. ev := q.logEvents[truncLen]
  99. q.logEvents[truncLen] = nil
  100. q.logEvents = q.logEvents[:truncLen]
  101. return ev
  102. }
  103. func (q *eventsQueue) getSize() int {
  104. q.RLock()
  105. defer q.RUnlock()
  106. return len(q.providerEvents) + len(q.fsEvents) + len(q.logEvents)
  107. }
  108. type notifierPlugin struct {
  109. config Config
  110. notifier notifier.Notifier
  111. client *plugin.Client
  112. queue *eventsQueue
  113. }
  114. func newNotifierPlugin(config Config) (*notifierPlugin, error) {
  115. p := &notifierPlugin{
  116. config: config,
  117. queue: &eventsQueue{},
  118. }
  119. if err := p.initialize(); err != nil {
  120. logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
  121. return nil, err
  122. }
  123. return p, nil
  124. }
  125. func (p *notifierPlugin) exited() bool {
  126. return p.client.Exited()
  127. }
  128. func (p *notifierPlugin) cleanup() {
  129. p.client.Kill()
  130. }
  131. func (p *notifierPlugin) initialize() error {
  132. killProcess(p.config.Cmd)
  133. logger.Debug(logSender, "", "create new notifier plugin %q", p.config.Cmd)
  134. if !p.config.NotifierOptions.hasActions() {
  135. return fmt.Errorf("no actions defined for the notifier plugin %q", p.config.Cmd)
  136. }
  137. secureConfig, err := p.config.getSecureConfig()
  138. if err != nil {
  139. return err
  140. }
  141. client := plugin.NewClient(&plugin.ClientConfig{
  142. HandshakeConfig: notifier.Handshake,
  143. Plugins: notifier.PluginMap,
  144. Cmd: p.config.getCommand(),
  145. SkipHostEnv: true,
  146. AllowedProtocols: []plugin.Protocol{
  147. plugin.ProtocolGRPC,
  148. },
  149. AutoMTLS: p.config.AutoMTLS,
  150. SecureConfig: secureConfig,
  151. Managed: false,
  152. Logger: &logger.HCLogAdapter{
  153. Logger: hclog.New(&hclog.LoggerOptions{
  154. Name: fmt.Sprintf("%v.%v", logSender, notifier.PluginName),
  155. Level: pluginsLogLevel,
  156. DisableTime: true,
  157. }),
  158. },
  159. })
  160. rpcClient, err := client.Client()
  161. if err != nil {
  162. logger.Debug(logSender, "", "unable to get rpc client for plugin %q: %v", p.config.Cmd, err)
  163. return err
  164. }
  165. raw, err := rpcClient.Dispense(notifier.PluginName)
  166. if err != nil {
  167. logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %q: %v",
  168. notifier.PluginName, p.config.Cmd, err)
  169. return err
  170. }
  171. p.client = client
  172. p.notifier = raw.(notifier.Notifier)
  173. return nil
  174. }
  175. func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
  176. if p.config.NotifierOptions.RetryMaxTime == 0 {
  177. return false
  178. }
  179. if time.Now().After(time.Unix(0, timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
  180. logger.Warn(logSender, "", "dropping too late event for plugin %v, event timestamp: %v",
  181. p.config.Cmd, time.Unix(0, timestamp))
  182. return false
  183. }
  184. if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
  185. return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize
  186. }
  187. return true
  188. }
  189. func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) {
  190. if !util.Contains(p.config.NotifierOptions.FsEvents, event.Action) {
  191. return
  192. }
  193. go func() {
  194. Handler.addTask()
  195. defer Handler.removeTask()
  196. p.sendFsEvent(event)
  197. }()
  198. }
  199. func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) {
  200. if !util.Contains(p.config.NotifierOptions.ProviderEvents, event.Action) ||
  201. !util.Contains(p.config.NotifierOptions.ProviderObjects, event.ObjectType) {
  202. return
  203. }
  204. go func() {
  205. Handler.addTask()
  206. defer Handler.removeTask()
  207. objectAsJSON, err := object.RenderAsJSON(event.Action != "delete")
  208. if err != nil {
  209. logger.Warn(logSender, "", "unable to render user as json for action %v: %v", event.Action, err)
  210. return
  211. }
  212. event.ObjectData = objectAsJSON
  213. p.sendProviderEvent(event)
  214. }()
  215. }
  216. func (p *notifierPlugin) notifyLogEvent(event *notifier.LogEvent) {
  217. go func() {
  218. Handler.addTask()
  219. defer Handler.removeTask()
  220. p.sendLogEvent(event)
  221. }()
  222. }
  223. func (p *notifierPlugin) sendFsEvent(event *notifier.FsEvent) {
  224. if err := p.notifier.NotifyFsEvent(event); err != nil {
  225. logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
  226. if p.canQueueEvent(event.Timestamp) {
  227. p.queue.addFsEvent(event)
  228. }
  229. }
  230. }
  231. func (p *notifierPlugin) sendProviderEvent(event *notifier.ProviderEvent) {
  232. if err := p.notifier.NotifyProviderEvent(event); err != nil {
  233. logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
  234. if p.canQueueEvent(event.Timestamp) {
  235. p.queue.addProviderEvent(event)
  236. }
  237. }
  238. }
  239. func (p *notifierPlugin) sendLogEvent(event *notifier.LogEvent) {
  240. if err := p.notifier.NotifyLogEvent(event); err != nil {
  241. logger.Warn(logSender, "", "unable to send log event to plugin %v: %v", p.config.Cmd, err)
  242. if p.canQueueEvent(event.Timestamp) {
  243. p.queue.addLogEvent(event)
  244. }
  245. }
  246. }
  247. func (p *notifierPlugin) sendQueuedEvents() {
  248. queueSize := p.queue.getSize()
  249. if queueSize == 0 {
  250. return
  251. }
  252. logger.Debug(logSender, "", "check queued events for notifier %q, events size: %v", p.config.Cmd, queueSize)
  253. fsEv := p.queue.popFsEvent()
  254. for fsEv != nil {
  255. go func(ev *notifier.FsEvent) {
  256. p.sendFsEvent(ev)
  257. }(fsEv)
  258. fsEv = p.queue.popFsEvent()
  259. }
  260. providerEv := p.queue.popProviderEvent()
  261. for providerEv != nil {
  262. go func(ev *notifier.ProviderEvent) {
  263. p.sendProviderEvent(ev)
  264. }(providerEv)
  265. providerEv = p.queue.popProviderEvent()
  266. }
  267. logEv := p.queue.popLogEvent()
  268. for logEv != nil {
  269. go func(ev *notifier.LogEvent) {
  270. p.sendLogEvent(ev)
  271. }(logEv)
  272. logEv = p.queue.popLogEvent()
  273. }
  274. logger.Debug(logSender, "", "queued events sent for notifier %q, new events size: %v", p.config.Cmd, p.queue.getSize())
  275. }