notifier.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package plugin
  15. import (
  16. "fmt"
  17. "slices"
  18. "sync"
  19. "time"
  20. "github.com/hashicorp/go-hclog"
  21. "github.com/hashicorp/go-plugin"
  22. "github.com/sftpgo/sdk/plugin/notifier"
  23. "github.com/drakkan/sftpgo/v2/internal/logger"
  24. )
  25. // NotifierConfig defines configuration parameters for notifiers plugins
  26. type NotifierConfig struct {
  27. FsEvents []string `json:"fs_events" mapstructure:"fs_events"`
  28. ProviderEvents []string `json:"provider_events" mapstructure:"provider_events"`
  29. ProviderObjects []string `json:"provider_objects" mapstructure:"provider_objects"`
  30. LogEvents []int `json:"log_events" mapstructure:"log_events"`
  31. RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"`
  32. RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
  33. }
  34. func (c *NotifierConfig) hasActions() bool {
  35. if len(c.FsEvents) > 0 {
  36. return true
  37. }
  38. if len(c.ProviderEvents) > 0 && len(c.ProviderObjects) > 0 {
  39. return true
  40. }
  41. if len(c.LogEvents) > 0 {
  42. return true
  43. }
  44. return false
  45. }
  46. type notifierPlugin struct {
  47. config Config
  48. notifier notifier.Notifier
  49. client *plugin.Client
  50. mu sync.RWMutex
  51. fsEvents []*notifier.FsEvent
  52. providerEvents []*notifier.ProviderEvent
  53. logEvents []*notifier.LogEvent
  54. }
  55. func newNotifierPlugin(config Config) (*notifierPlugin, error) {
  56. p := &notifierPlugin{
  57. config: config,
  58. }
  59. if err := p.initialize(); err != nil {
  60. logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
  61. return nil, err
  62. }
  63. return p, nil
  64. }
  65. func (p *notifierPlugin) exited() bool {
  66. return p.client.Exited()
  67. }
  68. func (p *notifierPlugin) cleanup() {
  69. p.client.Kill()
  70. }
  71. func (p *notifierPlugin) initialize() error {
  72. killProcess(p.config.Cmd)
  73. logger.Debug(logSender, "", "create new notifier plugin %q", p.config.Cmd)
  74. if !p.config.NotifierOptions.hasActions() {
  75. return fmt.Errorf("no actions defined for the notifier plugin %q", p.config.Cmd)
  76. }
  77. secureConfig, err := p.config.getSecureConfig()
  78. if err != nil {
  79. return err
  80. }
  81. client := plugin.NewClient(&plugin.ClientConfig{
  82. HandshakeConfig: notifier.Handshake,
  83. Plugins: notifier.PluginMap,
  84. Cmd: p.config.getCommand(),
  85. SkipHostEnv: true,
  86. AllowedProtocols: []plugin.Protocol{
  87. plugin.ProtocolGRPC,
  88. },
  89. AutoMTLS: p.config.AutoMTLS,
  90. SecureConfig: secureConfig,
  91. Managed: false,
  92. Logger: &logger.HCLogAdapter{
  93. Logger: hclog.New(&hclog.LoggerOptions{
  94. Name: fmt.Sprintf("%s.%s", logSender, notifier.PluginName),
  95. Level: pluginsLogLevel,
  96. DisableTime: true,
  97. }),
  98. },
  99. })
  100. rpcClient, err := client.Client()
  101. if err != nil {
  102. logger.Debug(logSender, "", "unable to get rpc client for plugin %q: %v", p.config.Cmd, err)
  103. return err
  104. }
  105. raw, err := rpcClient.Dispense(notifier.PluginName)
  106. if err != nil {
  107. logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %q: %v",
  108. notifier.PluginName, p.config.Cmd, err)
  109. return err
  110. }
  111. p.client = client
  112. p.notifier = raw.(notifier.Notifier)
  113. return nil
  114. }
  115. func (p *notifierPlugin) queueSize() int {
  116. p.mu.RLock()
  117. defer p.mu.RUnlock()
  118. return len(p.providerEvents) + len(p.fsEvents) + len(p.logEvents)
  119. }
  120. func (p *notifierPlugin) queueFsEvent(ev *notifier.FsEvent) {
  121. p.mu.Lock()
  122. defer p.mu.Unlock()
  123. p.fsEvents = append(p.fsEvents, ev)
  124. }
  125. func (p *notifierPlugin) queueProviderEvent(ev *notifier.ProviderEvent) {
  126. p.mu.Lock()
  127. defer p.mu.Unlock()
  128. p.providerEvents = append(p.providerEvents, ev)
  129. }
  130. func (p *notifierPlugin) queueLogEvent(ev *notifier.LogEvent) {
  131. p.mu.Lock()
  132. defer p.mu.Unlock()
  133. p.logEvents = append(p.logEvents, ev)
  134. }
  135. func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
  136. if p.config.NotifierOptions.RetryMaxTime == 0 {
  137. return false
  138. }
  139. if time.Now().After(time.Unix(0, timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
  140. logger.Warn(logSender, "", "dropping too late event for plugin %v, event timestamp: %v",
  141. p.config.Cmd, time.Unix(0, timestamp))
  142. return false
  143. }
  144. if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
  145. return p.queueSize() < p.config.NotifierOptions.RetryQueueMaxSize
  146. }
  147. return true
  148. }
  149. func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) {
  150. if !slices.Contains(p.config.NotifierOptions.FsEvents, event.Action) {
  151. return
  152. }
  153. p.sendFsEvent(event)
  154. }
  155. func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) {
  156. if !slices.Contains(p.config.NotifierOptions.ProviderEvents, event.Action) ||
  157. !slices.Contains(p.config.NotifierOptions.ProviderObjects, event.ObjectType) {
  158. return
  159. }
  160. p.sendProviderEvent(event, object)
  161. }
  162. func (p *notifierPlugin) notifyLogEvent(event *notifier.LogEvent) {
  163. p.sendLogEvent(event)
  164. }
  165. func (p *notifierPlugin) sendFsEvent(ev *notifier.FsEvent) {
  166. go func(event *notifier.FsEvent) {
  167. Handler.addTask()
  168. defer Handler.removeTask()
  169. if err := p.notifier.NotifyFsEvent(event); err != nil {
  170. logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
  171. if p.canQueueEvent(event.Timestamp) {
  172. p.queueFsEvent(event)
  173. }
  174. }
  175. }(ev)
  176. }
  177. func (p *notifierPlugin) sendProviderEvent(ev *notifier.ProviderEvent, object Renderer) {
  178. go func(event *notifier.ProviderEvent) {
  179. Handler.addTask()
  180. defer Handler.removeTask()
  181. if object != nil {
  182. objectAsJSON, err := object.RenderAsJSON(event.Action != "delete")
  183. if err != nil {
  184. logger.Error(logSender, "", "unable to render user as json for action %q: %v", event.Action, err)
  185. } else {
  186. event.ObjectData = objectAsJSON
  187. }
  188. }
  189. if err := p.notifier.NotifyProviderEvent(event); err != nil {
  190. logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
  191. if p.canQueueEvent(event.Timestamp) {
  192. p.queueProviderEvent(event)
  193. }
  194. }
  195. }(ev)
  196. }
  197. func (p *notifierPlugin) sendLogEvent(ev *notifier.LogEvent) {
  198. go func(event *notifier.LogEvent) {
  199. Handler.addTask()
  200. defer Handler.removeTask()
  201. if err := p.notifier.NotifyLogEvent(event); err != nil {
  202. logger.Warn(logSender, "", "unable to send log event to plugin %v: %v", p.config.Cmd, err)
  203. if p.canQueueEvent(event.Timestamp) {
  204. p.queueLogEvent(event)
  205. }
  206. }
  207. }(ev)
  208. }
  209. func (p *notifierPlugin) sendQueuedEvents() {
  210. queueSize := p.queueSize()
  211. if queueSize == 0 {
  212. return
  213. }
  214. p.mu.Lock()
  215. defer p.mu.Unlock()
  216. logger.Debug(logSender, "", "send queued events for notifier %q, events size: %v", p.config.Cmd, queueSize)
  217. for _, ev := range p.fsEvents {
  218. p.sendFsEvent(ev)
  219. }
  220. p.fsEvents = nil
  221. for _, ev := range p.providerEvents {
  222. p.sendProviderEvent(ev, nil)
  223. }
  224. p.providerEvents = nil
  225. for _, ev := range p.logEvents {
  226. p.sendLogEvent(ev)
  227. }
  228. p.logEvents = nil
  229. logger.Debug(logSender, "", "%d queued events sent for notifier %q,", queueSize, p.config.Cmd)
  230. }