plugin.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package plugin provides support for the SFTPGo plugin system
  15. package plugin
  16. import (
  17. "crypto/sha256"
  18. "crypto/x509"
  19. "encoding/hex"
  20. "errors"
  21. "fmt"
  22. "os"
  23. "os/exec"
  24. "path/filepath"
  25. "strings"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "github.com/hashicorp/go-hclog"
  30. "github.com/hashicorp/go-plugin"
  31. "github.com/sftpgo/sdk/plugin/auth"
  32. "github.com/sftpgo/sdk/plugin/eventsearcher"
  33. "github.com/sftpgo/sdk/plugin/ipfilter"
  34. kmsplugin "github.com/sftpgo/sdk/plugin/kms"
  35. "github.com/sftpgo/sdk/plugin/notifier"
  36. "github.com/drakkan/sftpgo/v2/internal/kms"
  37. "github.com/drakkan/sftpgo/v2/internal/logger"
  38. "github.com/drakkan/sftpgo/v2/internal/util"
  39. )
  40. const (
  41. logSender = "plugins"
  42. )
  43. var (
  44. // Handler defines the plugins manager
  45. Handler Manager
  46. pluginsLogLevel = hclog.Debug
  47. // ErrNoSearcher defines the error to return for events searches if no plugin is configured
  48. ErrNoSearcher = errors.New("no events searcher plugin defined")
  49. )
  50. // Renderer defines the interface for generic objects rendering
  51. type Renderer interface {
  52. RenderAsJSON(reload bool) ([]byte, error)
  53. }
  54. // Config defines a plugin configuration
  55. type Config struct {
  56. // Plugin type
  57. Type string `json:"type" mapstructure:"type"`
  58. // NotifierOptions defines options for notifiers plugins
  59. NotifierOptions NotifierConfig `json:"notifier_options" mapstructure:"notifier_options"`
  60. // KMSOptions defines options for a KMS plugin
  61. KMSOptions KMSConfig `json:"kms_options" mapstructure:"kms_options"`
  62. // AuthOptions defines options for authentication plugins
  63. AuthOptions AuthConfig `json:"auth_options" mapstructure:"auth_options"`
  64. // Path to the plugin executable
  65. Cmd string `json:"cmd" mapstructure:"cmd"`
  66. // Args to pass to the plugin executable
  67. Args []string `json:"args" mapstructure:"args"`
  68. // SHA256 checksum for the plugin executable.
  69. // If not empty it will be used to verify the integrity of the executable
  70. SHA256Sum string `json:"sha256sum" mapstructure:"sha256sum"`
  71. // If enabled the client and the server automatically negotiate mTLS for
  72. // transport authentication. This ensures that only the original client will
  73. // be allowed to connect to the server, and all other connections will be
  74. // rejected. The client will also refuse to connect to any server that isn't
  75. // the original instance started by the client.
  76. AutoMTLS bool `json:"auto_mtls" mapstructure:"auto_mtls"`
  77. // EnvPrefix defines the prefix for env vars to pass from the SFTPGo process
  78. // environment to the plugin. Set to "none" to not pass any environment
  79. // variable, set to "*" to pass all environment variables. If empty, the
  80. // prefix is returned as the plugin name in uppercase with "-" replaced with
  81. // "_" and a trailing "_". For example if the plugin name is
  82. // sftpgo-plugin-eventsearch the prefix will be SFTPGO_PLUGIN_EVENTSEARCH_
  83. EnvPrefix string `json:"env_prefix" mapstructure:"env_prefix"`
  84. // Additional environment variable names to pass from the SFTPGo process
  85. // environment to the plugin.
  86. EnvVars []string `json:"env_vars" mapstructure:"env_vars"`
  87. // unique identifier for kms plugins
  88. kmsID int
  89. }
  90. func (c *Config) getSecureConfig() (*plugin.SecureConfig, error) {
  91. if c.SHA256Sum != "" {
  92. checksum, err := hex.DecodeString(c.SHA256Sum)
  93. if err != nil {
  94. return nil, fmt.Errorf("invalid sha256 hash %q: %w", c.SHA256Sum, err)
  95. }
  96. return &plugin.SecureConfig{
  97. Checksum: checksum,
  98. Hash: sha256.New(),
  99. }, nil
  100. }
  101. return nil, nil
  102. }
  103. func (c *Config) getEnvVarPrefix() string {
  104. if c.EnvPrefix == "none" {
  105. return ""
  106. }
  107. if c.EnvPrefix != "" {
  108. return c.EnvPrefix
  109. }
  110. prefix := strings.ToUpper(filepath.Base(c.Cmd)) + "_"
  111. return strings.ReplaceAll(prefix, "-", "_")
  112. }
  113. func (c *Config) getCommand() *exec.Cmd {
  114. cmd := exec.Command(c.Cmd, c.Args...)
  115. cmd.Env = []string{}
  116. if envVarPrefix := c.getEnvVarPrefix(); envVarPrefix != "" {
  117. if envVarPrefix == "*" {
  118. logger.Debug(logSender, "", "sharing all the environment variables with plugin %q", c.Cmd)
  119. cmd.Env = append(cmd.Env, os.Environ()...)
  120. return cmd
  121. }
  122. logger.Debug(logSender, "", "adding env vars with prefix %q for plugin %q", envVarPrefix, c.Cmd)
  123. for _, val := range os.Environ() {
  124. if strings.HasPrefix(val, envVarPrefix) {
  125. cmd.Env = append(cmd.Env, val)
  126. }
  127. }
  128. }
  129. logger.Debug(logSender, "", "additional env vars for plugin %q: %+v", c.Cmd, c.EnvVars)
  130. for _, key := range c.EnvVars {
  131. cmd.Env = append(cmd.Env, os.Getenv(key))
  132. }
  133. return cmd
  134. }
  135. func (c *Config) newKMSPluginSecretProvider(base kms.BaseSecret, url, masterKey string) kms.SecretProvider {
  136. return &kmsPluginSecretProvider{
  137. BaseSecret: base,
  138. URL: url,
  139. MasterKey: masterKey,
  140. config: c,
  141. }
  142. }
  143. // Manager handles enabled plugins
  144. type Manager struct {
  145. closed atomic.Bool
  146. done chan bool
  147. // List of configured plugins
  148. Configs []Config `json:"plugins" mapstructure:"plugins"`
  149. notifLock sync.RWMutex
  150. notifiers []*notifierPlugin
  151. kmsLock sync.RWMutex
  152. kms []*kmsPlugin
  153. authLock sync.RWMutex
  154. auths []*authPlugin
  155. searcherLock sync.RWMutex
  156. searcher *searcherPlugin
  157. ipFilterLock sync.RWMutex
  158. filter *ipFilterPlugin
  159. authScopes int
  160. hasSearcher bool
  161. hasNotifiers bool
  162. hasAuths bool
  163. hasIPFilter bool
  164. concurrencyGuard chan struct{}
  165. }
  166. // Initialize initializes the configured plugins
  167. func Initialize(configs []Config, logLevel string) error {
  168. logger.Debug(logSender, "", "initialize")
  169. Handler = Manager{
  170. Configs: configs,
  171. done: make(chan bool),
  172. authScopes: -1,
  173. concurrencyGuard: make(chan struct{}, 250),
  174. }
  175. Handler.closed.Store(false)
  176. setLogLevel(logLevel)
  177. if len(configs) == 0 {
  178. return nil
  179. }
  180. if err := Handler.validateConfigs(); err != nil {
  181. return err
  182. }
  183. if err := initializePlugins(); err != nil {
  184. return err
  185. }
  186. startCheckTicker()
  187. return nil
  188. }
  189. func initializePlugins() error {
  190. kmsID := 0
  191. for idx, config := range Handler.Configs {
  192. switch config.Type {
  193. case notifier.PluginName:
  194. plugin, err := newNotifierPlugin(config)
  195. if err != nil {
  196. return err
  197. }
  198. Handler.notifiers = append(Handler.notifiers, plugin)
  199. case kmsplugin.PluginName:
  200. plugin, err := newKMSPlugin(config)
  201. if err != nil {
  202. return err
  203. }
  204. Handler.kms = append(Handler.kms, plugin)
  205. Handler.Configs[idx].kmsID = kmsID
  206. kmsID++
  207. kms.RegisterSecretProvider(config.KMSOptions.Scheme, config.KMSOptions.EncryptedStatus,
  208. Handler.Configs[idx].newKMSPluginSecretProvider)
  209. logger.Info(logSender, "", "registered secret provider for scheme: %v, encrypted status: %v",
  210. config.KMSOptions.Scheme, config.KMSOptions.EncryptedStatus)
  211. case auth.PluginName:
  212. plugin, err := newAuthPlugin(config)
  213. if err != nil {
  214. return err
  215. }
  216. Handler.auths = append(Handler.auths, plugin)
  217. if Handler.authScopes == -1 {
  218. Handler.authScopes = config.AuthOptions.Scope
  219. } else {
  220. Handler.authScopes |= config.AuthOptions.Scope
  221. }
  222. case eventsearcher.PluginName:
  223. plugin, err := newSearcherPlugin(config)
  224. if err != nil {
  225. return err
  226. }
  227. Handler.searcher = plugin
  228. case ipfilter.PluginName:
  229. plugin, err := newIPFilterPlugin(config)
  230. if err != nil {
  231. return err
  232. }
  233. Handler.filter = plugin
  234. default:
  235. return fmt.Errorf("unsupported plugin type: %v", config.Type)
  236. }
  237. }
  238. return nil
  239. }
  240. func (m *Manager) validateConfigs() error {
  241. kmsSchemes := make(map[string]bool)
  242. kmsEncryptions := make(map[string]bool)
  243. m.hasSearcher = false
  244. m.hasNotifiers = false
  245. m.hasAuths = false
  246. m.hasIPFilter = false
  247. for _, config := range m.Configs {
  248. switch config.Type {
  249. case kmsplugin.PluginName:
  250. if _, ok := kmsSchemes[config.KMSOptions.Scheme]; ok {
  251. return fmt.Errorf("invalid KMS configuration, duplicated scheme %q", config.KMSOptions.Scheme)
  252. }
  253. if _, ok := kmsEncryptions[config.KMSOptions.EncryptedStatus]; ok {
  254. return fmt.Errorf("invalid KMS configuration, duplicated encrypted status %q", config.KMSOptions.EncryptedStatus)
  255. }
  256. kmsSchemes[config.KMSOptions.Scheme] = true
  257. kmsEncryptions[config.KMSOptions.EncryptedStatus] = true
  258. case eventsearcher.PluginName:
  259. if m.hasSearcher {
  260. return errors.New("only one eventsearcher plugin can be defined")
  261. }
  262. m.hasSearcher = true
  263. case notifier.PluginName:
  264. m.hasNotifiers = true
  265. case auth.PluginName:
  266. m.hasAuths = true
  267. case ipfilter.PluginName:
  268. m.hasIPFilter = true
  269. }
  270. }
  271. return nil
  272. }
  273. // HasAuthenticators returns true if there is at least an auth plugin
  274. func (m *Manager) HasAuthenticators() bool {
  275. return m.hasAuths
  276. }
  277. // HasNotifiers returns true if there is at least a notifier plugin
  278. func (m *Manager) HasNotifiers() bool {
  279. return m.hasNotifiers
  280. }
  281. // NotifyFsEvent sends the fs event notifications using any defined notifier plugins
  282. func (m *Manager) NotifyFsEvent(event *notifier.FsEvent) {
  283. m.notifLock.RLock()
  284. defer m.notifLock.RUnlock()
  285. for _, n := range m.notifiers {
  286. n.notifyFsAction(event)
  287. }
  288. }
  289. // NotifyProviderEvent sends the provider event notifications using any defined notifier plugins
  290. func (m *Manager) NotifyProviderEvent(event *notifier.ProviderEvent, object Renderer) {
  291. m.notifLock.RLock()
  292. defer m.notifLock.RUnlock()
  293. for _, n := range m.notifiers {
  294. n.notifyProviderAction(event, object)
  295. }
  296. }
  297. // NotifyLogEvent sends the log event notifications using any defined notifier plugins
  298. func (m *Manager) NotifyLogEvent(event notifier.LogEventType, protocol, username, ip, role string, err error) {
  299. if !m.hasNotifiers {
  300. return
  301. }
  302. m.notifLock.RLock()
  303. defer m.notifLock.RUnlock()
  304. var e *notifier.LogEvent
  305. for _, n := range m.notifiers {
  306. if util.Contains(n.config.NotifierOptions.LogEvents, int(event)) {
  307. if e == nil {
  308. message := ""
  309. if err != nil {
  310. message = err.Error()
  311. }
  312. e = &notifier.LogEvent{
  313. Timestamp: time.Now().UnixNano(),
  314. Event: event,
  315. Protocol: protocol,
  316. Username: username,
  317. IP: ip,
  318. Message: message,
  319. Role: role,
  320. }
  321. }
  322. n.notifyLogEvent(e)
  323. }
  324. }
  325. }
  326. // HasSearcher returns true if an event searcher plugin is defined
  327. func (m *Manager) HasSearcher() bool {
  328. return m.hasSearcher
  329. }
  330. // SearchFsEvents returns the filesystem events matching the specified filters
  331. func (m *Manager) SearchFsEvents(searchFilters *eventsearcher.FsEventSearch) ([]byte, error) {
  332. if !m.hasSearcher {
  333. return nil, ErrNoSearcher
  334. }
  335. m.searcherLock.RLock()
  336. plugin := m.searcher
  337. m.searcherLock.RUnlock()
  338. return plugin.searchear.SearchFsEvents(searchFilters)
  339. }
  340. // SearchProviderEvents returns the provider events matching the specified filters
  341. func (m *Manager) SearchProviderEvents(searchFilters *eventsearcher.ProviderEventSearch) ([]byte, error) {
  342. if !m.hasSearcher {
  343. return nil, ErrNoSearcher
  344. }
  345. m.searcherLock.RLock()
  346. plugin := m.searcher
  347. m.searcherLock.RUnlock()
  348. return plugin.searchear.SearchProviderEvents(searchFilters)
  349. }
  350. // SearchLogEvents returns the log events matching the specified filters
  351. func (m *Manager) SearchLogEvents(searchFilters *eventsearcher.LogEventSearch) ([]byte, error) {
  352. if !m.hasSearcher {
  353. return nil, ErrNoSearcher
  354. }
  355. m.searcherLock.RLock()
  356. plugin := m.searcher
  357. m.searcherLock.RUnlock()
  358. return plugin.searchear.SearchLogEvents(searchFilters)
  359. }
  360. // IsIPBanned returns true if the IP filter plugin does not allow the specified ip.
  361. // If no IP filter plugin is defined this method returns false
  362. func (m *Manager) IsIPBanned(ip, protocol string) bool {
  363. if !m.hasIPFilter {
  364. return false
  365. }
  366. m.ipFilterLock.RLock()
  367. plugin := m.filter
  368. m.ipFilterLock.RUnlock()
  369. if plugin.exited() {
  370. logger.Warn(logSender, "", "ip filter plugin is not active, cannot check ip %q", ip)
  371. return false
  372. }
  373. return plugin.filter.CheckIP(ip, protocol) != nil
  374. }
  375. // ReloadFilter sends a reload request to the IP filter plugin
  376. func (m *Manager) ReloadFilter() {
  377. if !m.hasIPFilter {
  378. return
  379. }
  380. m.ipFilterLock.RLock()
  381. plugin := m.filter
  382. m.ipFilterLock.RUnlock()
  383. if err := plugin.filter.Reload(); err != nil {
  384. logger.Error(logSender, "", "unable to reload IP filter plugin: %v", err)
  385. }
  386. }
  387. func (m *Manager) kmsEncrypt(secret kms.BaseSecret, url string, masterKey string, kmsID int) (string, string, int32, error) {
  388. m.kmsLock.RLock()
  389. plugin := m.kms[kmsID]
  390. m.kmsLock.RUnlock()
  391. return plugin.Encrypt(secret, url, masterKey)
  392. }
  393. func (m *Manager) kmsDecrypt(secret kms.BaseSecret, url string, masterKey string, kmsID int) (string, error) {
  394. m.kmsLock.RLock()
  395. plugin := m.kms[kmsID]
  396. m.kmsLock.RUnlock()
  397. return plugin.Decrypt(secret, url, masterKey)
  398. }
  399. // HasAuthScope returns true if there is an auth plugin that support the specified scope
  400. func (m *Manager) HasAuthScope(scope int) bool {
  401. if m.authScopes == -1 {
  402. return false
  403. }
  404. return m.authScopes&scope != 0
  405. }
  406. // Authenticate tries to authenticate the specified user using an external plugin
  407. func (m *Manager) Authenticate(username, password, ip, protocol string, pkey string,
  408. tlsCert *x509.Certificate, authScope int, userAsJSON []byte,
  409. ) ([]byte, error) {
  410. switch authScope {
  411. case AuthScopePassword:
  412. return m.checkUserAndPass(username, password, ip, protocol, userAsJSON)
  413. case AuthScopePublicKey:
  414. return m.checkUserAndPublicKey(username, pkey, ip, protocol, userAsJSON)
  415. case AuthScopeKeyboardInteractive:
  416. return m.checkUserAndKeyboardInteractive(username, ip, protocol, userAsJSON)
  417. case AuthScopeTLSCertificate:
  418. cert, err := util.EncodeTLSCertToPem(tlsCert)
  419. if err != nil {
  420. logger.Error(logSender, "", "unable to encode tls certificate to pem: %v", err)
  421. return nil, fmt.Errorf("unable to encode tls cert to pem: %w", err)
  422. }
  423. return m.checkUserAndTLSCert(username, cert, ip, protocol, userAsJSON)
  424. default:
  425. return nil, fmt.Errorf("unsupported auth scope: %v", authScope)
  426. }
  427. }
  428. // ExecuteKeyboardInteractiveStep executes a keyboard interactive step
  429. func (m *Manager) ExecuteKeyboardInteractiveStep(req *KeyboardAuthRequest) (*KeyboardAuthResponse, error) {
  430. var plugin *authPlugin
  431. m.authLock.Lock()
  432. for _, p := range m.auths {
  433. if p.config.AuthOptions.Scope&AuthScopePassword != 0 {
  434. plugin = p
  435. break
  436. }
  437. }
  438. m.authLock.Unlock()
  439. if plugin == nil {
  440. return nil, errors.New("no auth plugin configured for keyaboard interactive authentication step")
  441. }
  442. return plugin.sendKeyboardIteractiveRequest(req)
  443. }
  444. func (m *Manager) checkUserAndPass(username, password, ip, protocol string, userAsJSON []byte) ([]byte, error) {
  445. var plugin *authPlugin
  446. m.authLock.Lock()
  447. for _, p := range m.auths {
  448. if p.config.AuthOptions.Scope&AuthScopePassword != 0 {
  449. plugin = p
  450. break
  451. }
  452. }
  453. m.authLock.Unlock()
  454. if plugin == nil {
  455. return nil, errors.New("no auth plugin configured for password checking")
  456. }
  457. return plugin.checkUserAndPass(username, password, ip, protocol, userAsJSON)
  458. }
  459. func (m *Manager) checkUserAndPublicKey(username, pubKey, ip, protocol string, userAsJSON []byte) ([]byte, error) {
  460. var plugin *authPlugin
  461. m.authLock.Lock()
  462. for _, p := range m.auths {
  463. if p.config.AuthOptions.Scope&AuthScopePublicKey != 0 {
  464. plugin = p
  465. break
  466. }
  467. }
  468. m.authLock.Unlock()
  469. if plugin == nil {
  470. return nil, errors.New("no auth plugin configured for public key checking")
  471. }
  472. return plugin.checkUserAndPublicKey(username, pubKey, ip, protocol, userAsJSON)
  473. }
  474. func (m *Manager) checkUserAndTLSCert(username, tlsCert, ip, protocol string, userAsJSON []byte) ([]byte, error) {
  475. var plugin *authPlugin
  476. m.authLock.Lock()
  477. for _, p := range m.auths {
  478. if p.config.AuthOptions.Scope&AuthScopeTLSCertificate != 0 {
  479. plugin = p
  480. break
  481. }
  482. }
  483. m.authLock.Unlock()
  484. if plugin == nil {
  485. return nil, errors.New("no auth plugin configured for TLS certificate checking")
  486. }
  487. return plugin.checkUserAndTLSCertificate(username, tlsCert, ip, protocol, userAsJSON)
  488. }
  489. func (m *Manager) checkUserAndKeyboardInteractive(username, ip, protocol string, userAsJSON []byte) ([]byte, error) {
  490. var plugin *authPlugin
  491. m.authLock.Lock()
  492. for _, p := range m.auths {
  493. if p.config.AuthOptions.Scope&AuthScopeKeyboardInteractive != 0 {
  494. plugin = p
  495. break
  496. }
  497. }
  498. m.authLock.Unlock()
  499. if plugin == nil {
  500. return nil, errors.New("no auth plugin configured for keyboard interactive checking")
  501. }
  502. return plugin.checkUserAndKeyboardInteractive(username, ip, protocol, userAsJSON)
  503. }
  504. func (m *Manager) checkCrashedPlugins() {
  505. m.notifLock.RLock()
  506. for idx, n := range m.notifiers {
  507. if n.exited() {
  508. defer func(cfg Config, index int) {
  509. Handler.restartNotifierPlugin(cfg, index)
  510. }(n.config, idx)
  511. } else {
  512. n.sendQueuedEvents()
  513. }
  514. }
  515. m.notifLock.RUnlock()
  516. m.kmsLock.RLock()
  517. for idx, k := range m.kms {
  518. if k.exited() {
  519. defer func(cfg Config, index int) {
  520. Handler.restartKMSPlugin(cfg, index)
  521. }(k.config, idx)
  522. }
  523. }
  524. m.kmsLock.RUnlock()
  525. m.authLock.RLock()
  526. for idx, a := range m.auths {
  527. if a.exited() {
  528. defer func(cfg Config, index int) {
  529. Handler.restartAuthPlugin(cfg, index)
  530. }(a.config, idx)
  531. }
  532. }
  533. m.authLock.RUnlock()
  534. if m.hasSearcher {
  535. m.searcherLock.RLock()
  536. if m.searcher.exited() {
  537. defer func(cfg Config) {
  538. Handler.restartSearcherPlugin(cfg)
  539. }(m.searcher.config)
  540. }
  541. m.searcherLock.RUnlock()
  542. }
  543. if m.hasIPFilter {
  544. m.ipFilterLock.RLock()
  545. if m.filter.exited() {
  546. defer func(cfg Config) {
  547. Handler.restartIPFilterPlugin(cfg)
  548. }(m.filter.config)
  549. }
  550. m.ipFilterLock.RUnlock()
  551. }
  552. }
  553. func (m *Manager) restartNotifierPlugin(config Config, idx int) {
  554. if m.closed.Load() {
  555. return
  556. }
  557. logger.Info(logSender, "", "try to restart crashed notifier plugin %q, idx: %v", config.Cmd, idx)
  558. plugin, err := newNotifierPlugin(config)
  559. if err != nil {
  560. logger.Error(logSender, "", "unable to restart notifier plugin %q, err: %v", config.Cmd, err)
  561. return
  562. }
  563. m.notifLock.Lock()
  564. plugin.queue = m.notifiers[idx].queue
  565. m.notifiers[idx] = plugin
  566. m.notifLock.Unlock()
  567. plugin.sendQueuedEvents()
  568. }
  569. func (m *Manager) restartKMSPlugin(config Config, idx int) {
  570. if m.closed.Load() {
  571. return
  572. }
  573. logger.Info(logSender, "", "try to restart crashed kms plugin %q, idx: %v", config.Cmd, idx)
  574. plugin, err := newKMSPlugin(config)
  575. if err != nil {
  576. logger.Error(logSender, "", "unable to restart kms plugin %q, err: %v", config.Cmd, err)
  577. return
  578. }
  579. m.kmsLock.Lock()
  580. m.kms[idx] = plugin
  581. m.kmsLock.Unlock()
  582. }
  583. func (m *Manager) restartAuthPlugin(config Config, idx int) {
  584. if m.closed.Load() {
  585. return
  586. }
  587. logger.Info(logSender, "", "try to restart crashed auth plugin %q, idx: %v", config.Cmd, idx)
  588. plugin, err := newAuthPlugin(config)
  589. if err != nil {
  590. logger.Error(logSender, "", "unable to restart auth plugin %q, err: %v", config.Cmd, err)
  591. return
  592. }
  593. m.authLock.Lock()
  594. m.auths[idx] = plugin
  595. m.authLock.Unlock()
  596. }
  597. func (m *Manager) restartSearcherPlugin(config Config) {
  598. if m.closed.Load() {
  599. return
  600. }
  601. logger.Info(logSender, "", "try to restart crashed searcher plugin %q", config.Cmd)
  602. plugin, err := newSearcherPlugin(config)
  603. if err != nil {
  604. logger.Error(logSender, "", "unable to restart searcher plugin %q, err: %v", config.Cmd, err)
  605. return
  606. }
  607. m.searcherLock.Lock()
  608. m.searcher = plugin
  609. m.searcherLock.Unlock()
  610. }
  611. func (m *Manager) restartIPFilterPlugin(config Config) {
  612. if m.closed.Load() {
  613. return
  614. }
  615. logger.Info(logSender, "", "try to restart crashed IP filter plugin %q", config.Cmd)
  616. plugin, err := newIPFilterPlugin(config)
  617. if err != nil {
  618. logger.Error(logSender, "", "unable to restart IP filter plugin %q, err: %v", config.Cmd, err)
  619. return
  620. }
  621. m.ipFilterLock.Lock()
  622. m.filter = plugin
  623. m.ipFilterLock.Unlock()
  624. }
  625. func (m *Manager) addTask() {
  626. m.concurrencyGuard <- struct{}{}
  627. }
  628. func (m *Manager) removeTask() {
  629. <-m.concurrencyGuard
  630. }
  631. // Cleanup releases all the active plugins
  632. func (m *Manager) Cleanup() {
  633. if m.closed.Swap(true) {
  634. return
  635. }
  636. logger.Debug(logSender, "", "cleanup")
  637. close(m.done)
  638. m.notifLock.Lock()
  639. for _, n := range m.notifiers {
  640. logger.Debug(logSender, "", "cleanup notifier plugin %v", n.config.Cmd)
  641. n.cleanup()
  642. }
  643. m.notifLock.Unlock()
  644. m.kmsLock.Lock()
  645. for _, k := range m.kms {
  646. logger.Debug(logSender, "", "cleanup kms plugin %v", k.config.Cmd)
  647. k.cleanup()
  648. }
  649. m.kmsLock.Unlock()
  650. m.authLock.Lock()
  651. for _, a := range m.auths {
  652. logger.Debug(logSender, "", "cleanup auth plugin %v", a.config.Cmd)
  653. a.cleanup()
  654. }
  655. m.authLock.Unlock()
  656. if m.hasSearcher {
  657. m.searcherLock.Lock()
  658. logger.Debug(logSender, "", "cleanup searcher plugin %v", m.searcher.config.Cmd)
  659. m.searcher.cleanup()
  660. m.searcherLock.Unlock()
  661. }
  662. if m.hasIPFilter {
  663. m.ipFilterLock.Lock()
  664. logger.Debug(logSender, "", "cleanup IP filter plugin %v", m.filter.config.Cmd)
  665. m.filter.cleanup()
  666. m.ipFilterLock.Unlock()
  667. }
  668. }
  669. func setLogLevel(logLevel string) {
  670. switch logLevel {
  671. case "info":
  672. pluginsLogLevel = hclog.Info
  673. case "warn":
  674. pluginsLogLevel = hclog.Warn
  675. case "error":
  676. pluginsLogLevel = hclog.Error
  677. default:
  678. pluginsLogLevel = hclog.Debug
  679. }
  680. }
  681. func startCheckTicker() {
  682. logger.Debug(logSender, "", "start plugins checker")
  683. go func() {
  684. ticker := time.NewTicker(30 * time.Second)
  685. defer ticker.Stop()
  686. for {
  687. select {
  688. case <-Handler.done:
  689. logger.Debug(logSender, "", "handler done, stop plugins checker")
  690. return
  691. case <-ticker.C:
  692. Handler.checkCrashedPlugins()
  693. }
  694. }
  695. }()
  696. }