notifier.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package plugin
  2. import (
  3. "crypto/sha256"
  4. "fmt"
  5. "os/exec"
  6. "sync"
  7. "time"
  8. "github.com/hashicorp/go-hclog"
  9. "github.com/hashicorp/go-plugin"
  10. "github.com/drakkan/sftpgo/v2/logger"
  11. "github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
  12. "github.com/drakkan/sftpgo/v2/sdk/plugin/notifier/proto"
  13. "github.com/drakkan/sftpgo/v2/util"
  14. )
  15. // NotifierConfig defines configuration parameters for notifiers plugins
  16. type NotifierConfig struct {
  17. FsEvents []string `json:"fs_events" mapstructure:"fs_events"`
  18. ProviderEvents []string `json:"provider_events" mapstructure:"provider_events"`
  19. ProviderObjects []string `json:"provider_objects" mapstructure:"provider_objects"`
  20. RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"`
  21. RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
  22. }
  23. func (c *NotifierConfig) hasActions() bool {
  24. if len(c.FsEvents) > 0 {
  25. return true
  26. }
  27. if len(c.ProviderEvents) > 0 && len(c.ProviderObjects) > 0 {
  28. return true
  29. }
  30. return false
  31. }
  32. type eventsQueue struct {
  33. sync.RWMutex
  34. fsEvents []*proto.FsEvent
  35. providerEvents []*proto.ProviderEvent
  36. }
  37. func (q *eventsQueue) addFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip string,
  38. fileSize int64, status int,
  39. ) {
  40. q.Lock()
  41. defer q.Unlock()
  42. q.fsEvents = append(q.fsEvents, &proto.FsEvent{
  43. Timestamp: timestamp,
  44. Action: action,
  45. Username: username,
  46. FsPath: fsPath,
  47. FsTargetPath: fsTargetPath,
  48. SshCmd: sshCmd,
  49. FileSize: fileSize,
  50. Protocol: protocol,
  51. Ip: ip,
  52. Status: int32(status),
  53. })
  54. }
  55. func (q *eventsQueue) addProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
  56. objectAsJSON []byte,
  57. ) {
  58. q.Lock()
  59. defer q.Unlock()
  60. q.providerEvents = append(q.providerEvents, &proto.ProviderEvent{
  61. Timestamp: timestamp,
  62. Action: action,
  63. ObjectType: objectType,
  64. Username: username,
  65. Ip: ip,
  66. ObjectName: objectName,
  67. ObjectData: objectAsJSON,
  68. })
  69. }
  70. func (q *eventsQueue) popFsEvent() *proto.FsEvent {
  71. q.Lock()
  72. defer q.Unlock()
  73. if len(q.fsEvents) == 0 {
  74. return nil
  75. }
  76. truncLen := len(q.fsEvents) - 1
  77. ev := q.fsEvents[truncLen]
  78. q.fsEvents[truncLen] = nil
  79. q.fsEvents = q.fsEvents[:truncLen]
  80. return ev
  81. }
  82. func (q *eventsQueue) popProviderEvent() *proto.ProviderEvent {
  83. q.Lock()
  84. defer q.Unlock()
  85. if len(q.providerEvents) == 0 {
  86. return nil
  87. }
  88. truncLen := len(q.providerEvents) - 1
  89. ev := q.providerEvents[truncLen]
  90. q.providerEvents[truncLen] = nil
  91. q.providerEvents = q.providerEvents[:truncLen]
  92. return ev
  93. }
  94. func (q *eventsQueue) getSize() int {
  95. q.RLock()
  96. defer q.RUnlock()
  97. return len(q.providerEvents) + len(q.fsEvents)
  98. }
  99. type notifierPlugin struct {
  100. config Config
  101. notifier notifier.Notifier
  102. client *plugin.Client
  103. queue *eventsQueue
  104. }
  105. func newNotifierPlugin(config Config) (*notifierPlugin, error) {
  106. p := &notifierPlugin{
  107. config: config,
  108. queue: &eventsQueue{},
  109. }
  110. if err := p.initialize(); err != nil {
  111. logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
  112. return nil, err
  113. }
  114. return p, nil
  115. }
  116. func (p *notifierPlugin) exited() bool {
  117. return p.client.Exited()
  118. }
  119. func (p *notifierPlugin) cleanup() {
  120. p.client.Kill()
  121. }
  122. func (p *notifierPlugin) initialize() error {
  123. killProcess(p.config.Cmd)
  124. logger.Debug(logSender, "", "create new notifier plugin %#v", p.config.Cmd)
  125. if !p.config.NotifierOptions.hasActions() {
  126. return fmt.Errorf("no actions defined for the notifier plugin %#v", p.config.Cmd)
  127. }
  128. var secureConfig *plugin.SecureConfig
  129. if p.config.SHA256Sum != "" {
  130. secureConfig.Checksum = []byte(p.config.SHA256Sum)
  131. secureConfig.Hash = sha256.New()
  132. }
  133. client := plugin.NewClient(&plugin.ClientConfig{
  134. HandshakeConfig: notifier.Handshake,
  135. Plugins: notifier.PluginMap,
  136. Cmd: exec.Command(p.config.Cmd, p.config.Args...),
  137. AllowedProtocols: []plugin.Protocol{
  138. plugin.ProtocolGRPC,
  139. },
  140. AutoMTLS: p.config.AutoMTLS,
  141. SecureConfig: secureConfig,
  142. Managed: false,
  143. Logger: &logger.HCLogAdapter{
  144. Logger: hclog.New(&hclog.LoggerOptions{
  145. Name: fmt.Sprintf("%v.%v", logSender, notifier.PluginName),
  146. Level: pluginsLogLevel,
  147. DisableTime: true,
  148. }),
  149. },
  150. })
  151. rpcClient, err := client.Client()
  152. if err != nil {
  153. logger.Debug(logSender, "", "unable to get rpc client for plugin %#v: %v", p.config.Cmd, err)
  154. return err
  155. }
  156. raw, err := rpcClient.Dispense(notifier.PluginName)
  157. if err != nil {
  158. logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
  159. notifier.PluginName, p.config.Cmd, err)
  160. return err
  161. }
  162. p.client = client
  163. p.notifier = raw.(notifier.Notifier)
  164. return nil
  165. }
  166. func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
  167. if p.config.NotifierOptions.RetryMaxTime == 0 {
  168. return false
  169. }
  170. if time.Now().After(time.Unix(0, timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
  171. return false
  172. }
  173. if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
  174. return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize
  175. }
  176. return true
  177. }
  178. func (p *notifierPlugin) notifyFsAction(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd,
  179. protocol, ip, virtualPath, virtualTargetPath, sessionID string, fileSize int64, errAction error) {
  180. if !util.IsStringInSlice(action, p.config.NotifierOptions.FsEvents) {
  181. return
  182. }
  183. go func() {
  184. status := 1
  185. if errAction != nil {
  186. status = 0
  187. }
  188. p.sendFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip, virtualPath, virtualTargetPath,
  189. sessionID, fileSize, status)
  190. }()
  191. }
  192. func (p *notifierPlugin) notifyProviderAction(timestamp int64, action, username, objectType, objectName, ip string,
  193. object Renderer,
  194. ) {
  195. if !util.IsStringInSlice(action, p.config.NotifierOptions.ProviderEvents) ||
  196. !util.IsStringInSlice(objectType, p.config.NotifierOptions.ProviderObjects) {
  197. return
  198. }
  199. go func() {
  200. objectAsJSON, err := object.RenderAsJSON(action != "delete")
  201. if err != nil {
  202. logger.Warn(logSender, "", "unable to render user as json for action %v: %v", action, err)
  203. return
  204. }
  205. p.sendProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON)
  206. }()
  207. }
  208. func (p *notifierPlugin) sendFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd,
  209. protocol, ip, virtualPath, virtualTargetPath, sessionID string, fileSize int64, status int) {
  210. if err := p.notifier.NotifyFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip,
  211. virtualPath, virtualTargetPath, sessionID, fileSize, status); err != nil {
  212. logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
  213. if p.canQueueEvent(timestamp) {
  214. p.queue.addFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip, fileSize, status)
  215. }
  216. }
  217. }
  218. func (p *notifierPlugin) sendProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
  219. objectAsJSON []byte,
  220. ) {
  221. if err := p.notifier.NotifyProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON); err != nil {
  222. logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
  223. if p.canQueueEvent(timestamp) {
  224. p.queue.addProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON)
  225. }
  226. }
  227. }
  228. func (p *notifierPlugin) sendQueuedEvents() {
  229. queueSize := p.queue.getSize()
  230. if queueSize == 0 {
  231. return
  232. }
  233. logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize)
  234. fsEv := p.queue.popFsEvent()
  235. for fsEv != nil {
  236. go p.sendFsEvent(fsEv.Timestamp, fsEv.Action, fsEv.Username, fsEv.FsPath, fsEv.FsTargetPath,
  237. fsEv.SshCmd, fsEv.Protocol, fsEv.Ip, fsEv.VirtualPath, fsEv.VirtualTargetPath, fsEv.SessionId,
  238. fsEv.FileSize, int(fsEv.Status))
  239. fsEv = p.queue.popFsEvent()
  240. }
  241. providerEv := p.queue.popProviderEvent()
  242. for providerEv != nil {
  243. go p.sendProviderEvent(providerEv.Timestamp, providerEv.Action, providerEv.Username, providerEv.ObjectType,
  244. providerEv.ObjectName, providerEv.Ip, providerEv.ObjectData)
  245. providerEv = p.queue.popProviderEvent()
  246. }
  247. logger.Debug(logSender, "", "queued events sent for notifier %#v, new events size: %v", p.config.Cmd, p.queue.getSize())
  248. }