notifier.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package plugin
  2. import (
  3. "crypto/sha256"
  4. "fmt"
  5. "os/exec"
  6. "path/filepath"
  7. "sync"
  8. "time"
  9. "github.com/hashicorp/go-hclog"
  10. "github.com/hashicorp/go-plugin"
  11. "github.com/drakkan/sftpgo/v2/logger"
  12. "github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
  13. "github.com/drakkan/sftpgo/v2/sdk/plugin/notifier/proto"
  14. "github.com/drakkan/sftpgo/v2/util"
  15. )
  16. // NotifierConfig defines configuration parameters for notifiers plugins
  17. type NotifierConfig struct {
  18. FsEvents []string `json:"fs_events" mapstructure:"fs_events"`
  19. ProviderEvents []string `json:"provider_events" mapstructure:"provider_events"`
  20. ProviderObjects []string `json:"provider_objects" mapstructure:"provider_objects"`
  21. RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"`
  22. RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
  23. }
  24. func (c *NotifierConfig) hasActions() bool {
  25. if len(c.FsEvents) > 0 {
  26. return true
  27. }
  28. if len(c.ProviderEvents) > 0 && len(c.ProviderObjects) > 0 {
  29. return true
  30. }
  31. return false
  32. }
  33. type eventsQueue struct {
  34. sync.RWMutex
  35. fsEvents []*proto.FsEvent
  36. providerEvents []*proto.ProviderEvent
  37. }
  38. func (q *eventsQueue) addFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip string,
  39. fileSize int64, status int,
  40. ) {
  41. q.Lock()
  42. defer q.Unlock()
  43. q.fsEvents = append(q.fsEvents, &proto.FsEvent{
  44. Timestamp: timestamp,
  45. Action: action,
  46. Username: username,
  47. FsPath: fsPath,
  48. FsTargetPath: fsTargetPath,
  49. SshCmd: sshCmd,
  50. FileSize: fileSize,
  51. Protocol: protocol,
  52. Ip: ip,
  53. Status: int32(status),
  54. })
  55. }
  56. func (q *eventsQueue) addProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
  57. objectAsJSON []byte,
  58. ) {
  59. q.Lock()
  60. defer q.Unlock()
  61. q.providerEvents = append(q.providerEvents, &proto.ProviderEvent{
  62. Timestamp: timestamp,
  63. Action: action,
  64. ObjectType: objectType,
  65. Username: username,
  66. Ip: ip,
  67. ObjectName: objectName,
  68. ObjectData: objectAsJSON,
  69. })
  70. }
  71. func (q *eventsQueue) popFsEvent() *proto.FsEvent {
  72. q.Lock()
  73. defer q.Unlock()
  74. if len(q.fsEvents) == 0 {
  75. return nil
  76. }
  77. truncLen := len(q.fsEvents) - 1
  78. ev := q.fsEvents[truncLen]
  79. q.fsEvents[truncLen] = nil
  80. q.fsEvents = q.fsEvents[:truncLen]
  81. return ev
  82. }
  83. func (q *eventsQueue) popProviderEvent() *proto.ProviderEvent {
  84. q.Lock()
  85. defer q.Unlock()
  86. if len(q.providerEvents) == 0 {
  87. return nil
  88. }
  89. truncLen := len(q.providerEvents) - 1
  90. ev := q.providerEvents[truncLen]
  91. q.providerEvents[truncLen] = nil
  92. q.providerEvents = q.providerEvents[:truncLen]
  93. return ev
  94. }
  95. func (q *eventsQueue) getSize() int {
  96. q.RLock()
  97. defer q.RUnlock()
  98. return len(q.providerEvents) + len(q.fsEvents)
  99. }
  100. type notifierPlugin struct {
  101. config Config
  102. notifier notifier.Notifier
  103. client *plugin.Client
  104. queue *eventsQueue
  105. }
  106. func newNotifierPlugin(config Config) (*notifierPlugin, error) {
  107. p := &notifierPlugin{
  108. config: config,
  109. queue: &eventsQueue{},
  110. }
  111. if err := p.initialize(); err != nil {
  112. logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
  113. return nil, err
  114. }
  115. return p, nil
  116. }
  117. func (p *notifierPlugin) exited() bool {
  118. return p.client.Exited()
  119. }
  120. func (p *notifierPlugin) cleanup() {
  121. p.client.Kill()
  122. }
  123. func (p *notifierPlugin) initialize() error {
  124. killProcess(p.config.Cmd)
  125. logger.Debug(logSender, "", "create new notifier plugin %#v", p.config.Cmd)
  126. if !p.config.NotifierOptions.hasActions() {
  127. return fmt.Errorf("no actions defined for the notifier plugin %#v", p.config.Cmd)
  128. }
  129. var secureConfig *plugin.SecureConfig
  130. if p.config.SHA256Sum != "" {
  131. secureConfig.Checksum = []byte(p.config.SHA256Sum)
  132. secureConfig.Hash = sha256.New()
  133. }
  134. client := plugin.NewClient(&plugin.ClientConfig{
  135. HandshakeConfig: notifier.Handshake,
  136. Plugins: notifier.PluginMap,
  137. Cmd: exec.Command(p.config.Cmd, p.config.Args...),
  138. AllowedProtocols: []plugin.Protocol{
  139. plugin.ProtocolGRPC,
  140. },
  141. AutoMTLS: p.config.AutoMTLS,
  142. SecureConfig: secureConfig,
  143. Managed: false,
  144. Logger: &logger.HCLogAdapter{
  145. Logger: hclog.New(&hclog.LoggerOptions{
  146. Name: fmt.Sprintf("%v.%v.%v", logSender, notifier.PluginName, filepath.Base(p.config.Cmd)),
  147. Level: pluginsLogLevel,
  148. DisableTime: true,
  149. }),
  150. },
  151. })
  152. rpcClient, err := client.Client()
  153. if err != nil {
  154. logger.Debug(logSender, "", "unable to get rpc client for plugin %#v: %v", p.config.Cmd, err)
  155. return err
  156. }
  157. raw, err := rpcClient.Dispense(notifier.PluginName)
  158. if err != nil {
  159. logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
  160. notifier.PluginName, p.config.Cmd, err)
  161. return err
  162. }
  163. p.client = client
  164. p.notifier = raw.(notifier.Notifier)
  165. return nil
  166. }
  167. func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
  168. if p.config.NotifierOptions.RetryMaxTime == 0 {
  169. return false
  170. }
  171. if time.Now().After(util.GetTimeFromMsecSinceEpoch(timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
  172. return false
  173. }
  174. if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
  175. return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize
  176. }
  177. return true
  178. }
  179. func (p *notifierPlugin) notifyFsAction(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd,
  180. protocol, ip, virtualPath, virtualTargetPath string, fileSize int64, errAction error) {
  181. if !util.IsStringInSlice(action, p.config.NotifierOptions.FsEvents) {
  182. return
  183. }
  184. go func() {
  185. status := 1
  186. if errAction != nil {
  187. status = 0
  188. }
  189. p.sendFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip, virtualPath, virtualTargetPath,
  190. fileSize, status)
  191. }()
  192. }
  193. func (p *notifierPlugin) notifyProviderAction(timestamp int64, action, username, objectType, objectName, ip string,
  194. object Renderer,
  195. ) {
  196. if !util.IsStringInSlice(action, p.config.NotifierOptions.ProviderEvents) ||
  197. !util.IsStringInSlice(objectType, p.config.NotifierOptions.ProviderObjects) {
  198. return
  199. }
  200. go func() {
  201. objectAsJSON, err := object.RenderAsJSON(action != "delete")
  202. if err != nil {
  203. logger.Warn(logSender, "", "unable to render user as json for action %v: %v", action, err)
  204. return
  205. }
  206. p.sendProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON)
  207. }()
  208. }
  209. func (p *notifierPlugin) sendFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd,
  210. protocol, ip, virtualPath, virtualTargetPath string, fileSize int64, status int) {
  211. if err := p.notifier.NotifyFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip,
  212. virtualPath, virtualTargetPath, fileSize, status); err != nil {
  213. logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
  214. if p.canQueueEvent(timestamp) {
  215. p.queue.addFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip, fileSize, status)
  216. }
  217. }
  218. }
  219. func (p *notifierPlugin) sendProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
  220. objectAsJSON []byte,
  221. ) {
  222. if err := p.notifier.NotifyProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON); err != nil {
  223. logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
  224. if p.canQueueEvent(timestamp) {
  225. p.queue.addProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON)
  226. }
  227. }
  228. }
  229. func (p *notifierPlugin) sendQueuedEvents() {
  230. queueSize := p.queue.getSize()
  231. if queueSize == 0 {
  232. return
  233. }
  234. logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize)
  235. fsEv := p.queue.popFsEvent()
  236. for fsEv != nil {
  237. go p.sendFsEvent(fsEv.Timestamp, fsEv.Action, fsEv.Username, fsEv.FsPath, fsEv.FsTargetPath,
  238. fsEv.SshCmd, fsEv.Protocol, fsEv.Ip, fsEv.VirtualPath, fsEv.VirtualTargetPath, fsEv.FileSize, int(fsEv.Status))
  239. fsEv = p.queue.popFsEvent()
  240. }
  241. providerEv := p.queue.popProviderEvent()
  242. for providerEv != nil {
  243. go p.sendProviderEvent(providerEv.Timestamp, providerEv.Action, providerEv.Username, providerEv.ObjectType,
  244. providerEv.ObjectName, providerEv.Ip, providerEv.ObjectData)
  245. providerEv = p.queue.popProviderEvent()
  246. }
  247. logger.Debug(logSender, "", "queued events sent for notifier %#v, new events size: %v", p.config.Cmd, p.queue.getSize())
  248. }