notifier.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package plugin
  2. import (
  3. "crypto/sha256"
  4. "fmt"
  5. "os/exec"
  6. "sync"
  7. "time"
  8. "github.com/hashicorp/go-hclog"
  9. "github.com/hashicorp/go-plugin"
  10. "github.com/sftpgo/sdk/plugin/notifier"
  11. "github.com/drakkan/sftpgo/v2/logger"
  12. "github.com/drakkan/sftpgo/v2/util"
  13. )
  14. // NotifierConfig defines configuration parameters for notifiers plugins
  15. type NotifierConfig struct {
  16. FsEvents []string `json:"fs_events" mapstructure:"fs_events"`
  17. ProviderEvents []string `json:"provider_events" mapstructure:"provider_events"`
  18. ProviderObjects []string `json:"provider_objects" mapstructure:"provider_objects"`
  19. RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"`
  20. RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
  21. }
  22. func (c *NotifierConfig) hasActions() bool {
  23. if len(c.FsEvents) > 0 {
  24. return true
  25. }
  26. if len(c.ProviderEvents) > 0 && len(c.ProviderObjects) > 0 {
  27. return true
  28. }
  29. return false
  30. }
  31. type eventsQueue struct {
  32. sync.RWMutex
  33. fsEvents []*notifier.FsEvent
  34. providerEvents []*notifier.ProviderEvent
  35. }
  36. func (q *eventsQueue) addFsEvent(event *notifier.FsEvent) {
  37. q.Lock()
  38. defer q.Unlock()
  39. q.fsEvents = append(q.fsEvents, event)
  40. }
  41. func (q *eventsQueue) addProviderEvent(event *notifier.ProviderEvent) {
  42. q.Lock()
  43. defer q.Unlock()
  44. q.providerEvents = append(q.providerEvents, event)
  45. }
  46. func (q *eventsQueue) popFsEvent() *notifier.FsEvent {
  47. q.Lock()
  48. defer q.Unlock()
  49. if len(q.fsEvents) == 0 {
  50. return nil
  51. }
  52. truncLen := len(q.fsEvents) - 1
  53. ev := q.fsEvents[truncLen]
  54. q.fsEvents[truncLen] = nil
  55. q.fsEvents = q.fsEvents[:truncLen]
  56. return ev
  57. }
  58. func (q *eventsQueue) popProviderEvent() *notifier.ProviderEvent {
  59. q.Lock()
  60. defer q.Unlock()
  61. if len(q.providerEvents) == 0 {
  62. return nil
  63. }
  64. truncLen := len(q.providerEvents) - 1
  65. ev := q.providerEvents[truncLen]
  66. q.providerEvents[truncLen] = nil
  67. q.providerEvents = q.providerEvents[:truncLen]
  68. return ev
  69. }
  70. func (q *eventsQueue) getSize() int {
  71. q.RLock()
  72. defer q.RUnlock()
  73. return len(q.providerEvents) + len(q.fsEvents)
  74. }
  75. type notifierPlugin struct {
  76. config Config
  77. notifier notifier.Notifier
  78. client *plugin.Client
  79. queue *eventsQueue
  80. }
  81. func newNotifierPlugin(config Config) (*notifierPlugin, error) {
  82. p := &notifierPlugin{
  83. config: config,
  84. queue: &eventsQueue{},
  85. }
  86. if err := p.initialize(); err != nil {
  87. logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
  88. return nil, err
  89. }
  90. return p, nil
  91. }
  92. func (p *notifierPlugin) exited() bool {
  93. return p.client.Exited()
  94. }
  95. func (p *notifierPlugin) cleanup() {
  96. p.client.Kill()
  97. }
  98. func (p *notifierPlugin) initialize() error {
  99. killProcess(p.config.Cmd)
  100. logger.Debug(logSender, "", "create new notifier plugin %#v", p.config.Cmd)
  101. if !p.config.NotifierOptions.hasActions() {
  102. return fmt.Errorf("no actions defined for the notifier plugin %#v", p.config.Cmd)
  103. }
  104. var secureConfig *plugin.SecureConfig
  105. if p.config.SHA256Sum != "" {
  106. secureConfig.Checksum = []byte(p.config.SHA256Sum)
  107. secureConfig.Hash = sha256.New()
  108. }
  109. client := plugin.NewClient(&plugin.ClientConfig{
  110. HandshakeConfig: notifier.Handshake,
  111. Plugins: notifier.PluginMap,
  112. Cmd: exec.Command(p.config.Cmd, p.config.Args...),
  113. AllowedProtocols: []plugin.Protocol{
  114. plugin.ProtocolGRPC,
  115. },
  116. AutoMTLS: p.config.AutoMTLS,
  117. SecureConfig: secureConfig,
  118. Managed: false,
  119. Logger: &logger.HCLogAdapter{
  120. Logger: hclog.New(&hclog.LoggerOptions{
  121. Name: fmt.Sprintf("%v.%v", logSender, notifier.PluginName),
  122. Level: pluginsLogLevel,
  123. DisableTime: true,
  124. }),
  125. },
  126. })
  127. rpcClient, err := client.Client()
  128. if err != nil {
  129. logger.Debug(logSender, "", "unable to get rpc client for plugin %#v: %v", p.config.Cmd, err)
  130. return err
  131. }
  132. raw, err := rpcClient.Dispense(notifier.PluginName)
  133. if err != nil {
  134. logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
  135. notifier.PluginName, p.config.Cmd, err)
  136. return err
  137. }
  138. p.client = client
  139. p.notifier = raw.(notifier.Notifier)
  140. return nil
  141. }
  142. func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
  143. if p.config.NotifierOptions.RetryMaxTime == 0 {
  144. return false
  145. }
  146. if time.Now().After(time.Unix(0, timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
  147. logger.Warn(logSender, "", "dropping too late event for plugin %v, event timestamp: %v",
  148. p.config.Cmd, time.Unix(0, timestamp))
  149. return false
  150. }
  151. if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
  152. return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize
  153. }
  154. return true
  155. }
  156. func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) {
  157. if !util.Contains(p.config.NotifierOptions.FsEvents, event.Action) {
  158. return
  159. }
  160. go func() {
  161. p.sendFsEvent(event)
  162. }()
  163. }
  164. func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) {
  165. if !util.Contains(p.config.NotifierOptions.ProviderEvents, event.Action) ||
  166. !util.Contains(p.config.NotifierOptions.ProviderObjects, event.ObjectType) {
  167. return
  168. }
  169. go func() {
  170. objectAsJSON, err := object.RenderAsJSON(event.Action != "delete")
  171. if err != nil {
  172. logger.Warn(logSender, "", "unable to render user as json for action %v: %v", event.Action, err)
  173. return
  174. }
  175. event.ObjectData = objectAsJSON
  176. p.sendProviderEvent(event)
  177. }()
  178. }
  179. func (p *notifierPlugin) sendFsEvent(event *notifier.FsEvent) {
  180. if err := p.notifier.NotifyFsEvent(event); err != nil {
  181. logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
  182. if p.canQueueEvent(event.Timestamp) {
  183. p.queue.addFsEvent(event)
  184. }
  185. }
  186. }
  187. func (p *notifierPlugin) sendProviderEvent(event *notifier.ProviderEvent) {
  188. if err := p.notifier.NotifyProviderEvent(event); err != nil {
  189. logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
  190. if p.canQueueEvent(event.Timestamp) {
  191. p.queue.addProviderEvent(event)
  192. }
  193. }
  194. }
  195. func (p *notifierPlugin) sendQueuedEvents() {
  196. queueSize := p.queue.getSize()
  197. if queueSize == 0 {
  198. return
  199. }
  200. logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize)
  201. fsEv := p.queue.popFsEvent()
  202. for fsEv != nil {
  203. go func(ev *notifier.FsEvent) {
  204. p.sendFsEvent(ev)
  205. }(fsEv)
  206. fsEv = p.queue.popFsEvent()
  207. }
  208. providerEv := p.queue.popProviderEvent()
  209. for providerEv != nil {
  210. go func(ev *notifier.ProviderEvent) {
  211. p.sendProviderEvent(ev)
  212. }(providerEv)
  213. providerEv = p.queue.popProviderEvent()
  214. }
  215. logger.Debug(logSender, "", "queued events sent for notifier %#v, new events size: %v", p.config.Cmd, p.queue.getSize())
  216. }