notifier.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package plugin
  15. import (
  16. "fmt"
  17. "os/exec"
  18. "sync"
  19. "time"
  20. "github.com/hashicorp/go-hclog"
  21. "github.com/hashicorp/go-plugin"
  22. "github.com/sftpgo/sdk/plugin/notifier"
  23. "github.com/drakkan/sftpgo/v2/internal/logger"
  24. "github.com/drakkan/sftpgo/v2/internal/util"
  25. )
  26. // NotifierConfig defines configuration parameters for notifiers plugins
  27. type NotifierConfig struct {
  28. FsEvents []string `json:"fs_events" mapstructure:"fs_events"`
  29. ProviderEvents []string `json:"provider_events" mapstructure:"provider_events"`
  30. ProviderObjects []string `json:"provider_objects" mapstructure:"provider_objects"`
  31. RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"`
  32. RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
  33. }
  34. func (c *NotifierConfig) hasActions() bool {
  35. if len(c.FsEvents) > 0 {
  36. return true
  37. }
  38. if len(c.ProviderEvents) > 0 && len(c.ProviderObjects) > 0 {
  39. return true
  40. }
  41. return false
  42. }
  43. type eventsQueue struct {
  44. sync.RWMutex
  45. fsEvents []*notifier.FsEvent
  46. providerEvents []*notifier.ProviderEvent
  47. }
  48. func (q *eventsQueue) addFsEvent(event *notifier.FsEvent) {
  49. q.Lock()
  50. defer q.Unlock()
  51. q.fsEvents = append(q.fsEvents, event)
  52. }
  53. func (q *eventsQueue) addProviderEvent(event *notifier.ProviderEvent) {
  54. q.Lock()
  55. defer q.Unlock()
  56. q.providerEvents = append(q.providerEvents, event)
  57. }
  58. func (q *eventsQueue) popFsEvent() *notifier.FsEvent {
  59. q.Lock()
  60. defer q.Unlock()
  61. if len(q.fsEvents) == 0 {
  62. return nil
  63. }
  64. truncLen := len(q.fsEvents) - 1
  65. ev := q.fsEvents[truncLen]
  66. q.fsEvents[truncLen] = nil
  67. q.fsEvents = q.fsEvents[:truncLen]
  68. return ev
  69. }
  70. func (q *eventsQueue) popProviderEvent() *notifier.ProviderEvent {
  71. q.Lock()
  72. defer q.Unlock()
  73. if len(q.providerEvents) == 0 {
  74. return nil
  75. }
  76. truncLen := len(q.providerEvents) - 1
  77. ev := q.providerEvents[truncLen]
  78. q.providerEvents[truncLen] = nil
  79. q.providerEvents = q.providerEvents[:truncLen]
  80. return ev
  81. }
  82. func (q *eventsQueue) getSize() int {
  83. q.RLock()
  84. defer q.RUnlock()
  85. return len(q.providerEvents) + len(q.fsEvents)
  86. }
  87. type notifierPlugin struct {
  88. config Config
  89. notifier notifier.Notifier
  90. client *plugin.Client
  91. queue *eventsQueue
  92. }
  93. func newNotifierPlugin(config Config) (*notifierPlugin, error) {
  94. p := &notifierPlugin{
  95. config: config,
  96. queue: &eventsQueue{},
  97. }
  98. if err := p.initialize(); err != nil {
  99. logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
  100. return nil, err
  101. }
  102. return p, nil
  103. }
  104. func (p *notifierPlugin) exited() bool {
  105. return p.client.Exited()
  106. }
  107. func (p *notifierPlugin) cleanup() {
  108. p.client.Kill()
  109. }
  110. func (p *notifierPlugin) initialize() error {
  111. killProcess(p.config.Cmd)
  112. logger.Debug(logSender, "", "create new notifier plugin %#v", p.config.Cmd)
  113. if !p.config.NotifierOptions.hasActions() {
  114. return fmt.Errorf("no actions defined for the notifier plugin %#v", p.config.Cmd)
  115. }
  116. secureConfig, err := p.config.getSecureConfig()
  117. if err != nil {
  118. return err
  119. }
  120. client := plugin.NewClient(&plugin.ClientConfig{
  121. HandshakeConfig: notifier.Handshake,
  122. Plugins: notifier.PluginMap,
  123. Cmd: exec.Command(p.config.Cmd, p.config.Args...),
  124. AllowedProtocols: []plugin.Protocol{
  125. plugin.ProtocolGRPC,
  126. },
  127. AutoMTLS: p.config.AutoMTLS,
  128. SecureConfig: secureConfig,
  129. Managed: false,
  130. Logger: &logger.HCLogAdapter{
  131. Logger: hclog.New(&hclog.LoggerOptions{
  132. Name: fmt.Sprintf("%v.%v", logSender, notifier.PluginName),
  133. Level: pluginsLogLevel,
  134. DisableTime: true,
  135. }),
  136. },
  137. })
  138. rpcClient, err := client.Client()
  139. if err != nil {
  140. logger.Debug(logSender, "", "unable to get rpc client for plugin %#v: %v", p.config.Cmd, err)
  141. return err
  142. }
  143. raw, err := rpcClient.Dispense(notifier.PluginName)
  144. if err != nil {
  145. logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
  146. notifier.PluginName, p.config.Cmd, err)
  147. return err
  148. }
  149. p.client = client
  150. p.notifier = raw.(notifier.Notifier)
  151. return nil
  152. }
  153. func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
  154. if p.config.NotifierOptions.RetryMaxTime == 0 {
  155. return false
  156. }
  157. if time.Now().After(time.Unix(0, timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
  158. logger.Warn(logSender, "", "dropping too late event for plugin %v, event timestamp: %v",
  159. p.config.Cmd, time.Unix(0, timestamp))
  160. return false
  161. }
  162. if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
  163. return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize
  164. }
  165. return true
  166. }
  167. func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) {
  168. if !util.Contains(p.config.NotifierOptions.FsEvents, event.Action) {
  169. return
  170. }
  171. go func() {
  172. Handler.addTask()
  173. defer Handler.removeTask()
  174. p.sendFsEvent(event)
  175. }()
  176. }
  177. func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) {
  178. if !util.Contains(p.config.NotifierOptions.ProviderEvents, event.Action) ||
  179. !util.Contains(p.config.NotifierOptions.ProviderObjects, event.ObjectType) {
  180. return
  181. }
  182. go func() {
  183. Handler.addTask()
  184. defer Handler.removeTask()
  185. objectAsJSON, err := object.RenderAsJSON(event.Action != "delete")
  186. if err != nil {
  187. logger.Warn(logSender, "", "unable to render user as json for action %v: %v", event.Action, err)
  188. return
  189. }
  190. event.ObjectData = objectAsJSON
  191. p.sendProviderEvent(event)
  192. }()
  193. }
  194. func (p *notifierPlugin) sendFsEvent(event *notifier.FsEvent) {
  195. if err := p.notifier.NotifyFsEvent(event); err != nil {
  196. logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
  197. if p.canQueueEvent(event.Timestamp) {
  198. p.queue.addFsEvent(event)
  199. }
  200. }
  201. }
  202. func (p *notifierPlugin) sendProviderEvent(event *notifier.ProviderEvent) {
  203. if err := p.notifier.NotifyProviderEvent(event); err != nil {
  204. logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
  205. if p.canQueueEvent(event.Timestamp) {
  206. p.queue.addProviderEvent(event)
  207. }
  208. }
  209. }
  210. func (p *notifierPlugin) sendQueuedEvents() {
  211. queueSize := p.queue.getSize()
  212. if queueSize == 0 {
  213. return
  214. }
  215. logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize)
  216. fsEv := p.queue.popFsEvent()
  217. for fsEv != nil {
  218. go func(ev *notifier.FsEvent) {
  219. p.sendFsEvent(ev)
  220. }(fsEv)
  221. fsEv = p.queue.popFsEvent()
  222. }
  223. providerEv := p.queue.popProviderEvent()
  224. for providerEv != nil {
  225. go func(ev *notifier.ProviderEvent) {
  226. p.sendProviderEvent(ev)
  227. }(providerEv)
  228. providerEv = p.queue.popProviderEvent()
  229. }
  230. logger.Debug(logSender, "", "queued events sent for notifier %#v, new events size: %v", p.config.Cmd, p.queue.getSize())
  231. }