guerrilla.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. package guerrilla
  2. import (
  3. "errors"
  4. evbus "github.com/asaskevich/EventBus"
  5. "github.com/flashmob/go-guerrilla/backends"
  6. "github.com/flashmob/go-guerrilla/log"
  7. "sync"
  8. "sync/atomic"
  9. )
  10. const (
  11. // server has just been created
  12. GuerrillaStateNew = iota
  13. // Server has been started and is running
  14. GuerrillaStateStarted
  15. // Server has just been stopped
  16. GuerrillaStateStopped
  17. )
  18. type Errors []error
  19. // implement the Error interface
  20. func (e Errors) Error() string {
  21. if len(e) == 1 {
  22. return e[0].Error()
  23. }
  24. // multiple errors
  25. msg := ""
  26. for _, err := range e {
  27. msg += "\n" + err.Error()
  28. }
  29. return msg
  30. }
  31. type Guerrilla interface {
  32. Start() error
  33. Shutdown()
  34. Subscribe(topic string, fn interface{}) error
  35. Publish(topic string, args ...interface{})
  36. Unsubscribe(topic string, handler interface{}) error
  37. SetLogger(log.Logger)
  38. }
  39. type guerrilla struct {
  40. Config AppConfig
  41. servers map[string]*server
  42. backend backends.Backend
  43. // guard controls access to g.servers
  44. guard sync.Mutex
  45. state int8
  46. bus *evbus.EventBus
  47. mainlog logStore
  48. }
  49. type logStore struct {
  50. atomic.Value
  51. }
  52. // Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
  53. func (ls *logStore) Get() log.Logger {
  54. if v, ok := ls.Load().(log.Logger); ok {
  55. return v
  56. }
  57. l, _ := log.GetLogger(log.OutputStderr.String())
  58. return l
  59. }
  60. // Returns a new instance of Guerrilla with the given config, not yet running.
  61. func New(ac *AppConfig, b backends.Backend, l log.Logger) (Guerrilla, error) {
  62. g := &guerrilla{
  63. Config: *ac, // take a local copy
  64. servers: make(map[string]*server, len(ac.Servers)),
  65. backend: b,
  66. bus: evbus.New(),
  67. }
  68. g.mainlog.Store(l)
  69. if ac.LogLevel != "" {
  70. g.mainlog.Get().SetLevel(ac.LogLevel)
  71. }
  72. g.state = GuerrillaStateNew
  73. err := g.makeServers()
  74. // subscribe for any events that may come in while running
  75. g.subscribeEvents()
  76. return g, err
  77. }
  78. // Instantiate servers
  79. func (g *guerrilla) makeServers() error {
  80. g.mainlog.Get().Debug("making servers")
  81. var errs Errors
  82. for _, sc := range g.Config.Servers {
  83. if _, ok := g.servers[sc.ListenInterface]; ok {
  84. // server already instantiated
  85. continue
  86. }
  87. server, err := newServer(&sc, g.backend, g.mainlog.Get())
  88. if err != nil {
  89. g.mainlog.Get().WithError(err).Errorf("Failed to create server [%s]", sc.ListenInterface)
  90. errs = append(errs, err)
  91. }
  92. if server != nil {
  93. g.servers[sc.ListenInterface] = server
  94. server.setAllowedHosts(g.Config.AllowedHosts)
  95. }
  96. }
  97. if len(g.servers) == 0 {
  98. errs = append(errs, errors.New("There are no servers that can start, please check your config"))
  99. }
  100. if len(errs) == 0 {
  101. return nil
  102. }
  103. return errs
  104. }
  105. // find a server by interface, retuning the index of the config and instance of server
  106. func (g *guerrilla) findServer(iface string) (int, *server) {
  107. g.guard.Lock()
  108. defer g.guard.Unlock()
  109. ret := -1
  110. for i := range g.Config.Servers {
  111. if g.Config.Servers[i].ListenInterface == iface {
  112. server := g.servers[iface]
  113. ret = i
  114. return ret, server
  115. }
  116. }
  117. return ret, nil
  118. }
  119. func (g *guerrilla) removeServer(serverConfigIndex int, iface string) {
  120. g.guard.Lock()
  121. defer g.guard.Unlock()
  122. delete(g.servers, iface)
  123. // cut out from the slice
  124. g.Config.Servers = append(g.Config.Servers[:serverConfigIndex], g.Config.Servers[1:]...)
  125. }
  126. func (g *guerrilla) addServer(sc *ServerConfig) {
  127. g.guard.Lock()
  128. defer g.guard.Unlock()
  129. g.Config.Servers = append(g.Config.Servers, *sc)
  130. g.makeServers()
  131. }
  132. func (g *guerrilla) setConfig(i int, sc *ServerConfig) {
  133. g.guard.Lock()
  134. defer g.guard.Unlock()
  135. g.Config.Servers[i] = *sc
  136. g.servers[sc.ListenInterface].setConfig(sc)
  137. }
  138. // mapServers calls a callback on each server in g.servers map
  139. // It locks the g.servers map before mapping
  140. func (g *guerrilla) mapServers(callback func(*server)) map[string]*server {
  141. defer g.guard.Unlock()
  142. g.guard.Lock()
  143. for _, server := range g.servers {
  144. callback(server)
  145. }
  146. return g.servers
  147. }
  148. // subscribeEvents subscribes event handlers for configuration change events
  149. func (g *guerrilla) subscribeEvents() {
  150. // allowed_hosts changed, set for all servers
  151. g.Subscribe("config_change:allowed_hosts", func(c *AppConfig) {
  152. g.mapServers(func(server *server) {
  153. server.setAllowedHosts(c.AllowedHosts)
  154. })
  155. g.mainlog.Get().Infof("allowed_hosts config changed, a new list was set")
  156. })
  157. // the main log file changed
  158. g.Subscribe("config_change:log_file", func(c *AppConfig) {
  159. var err error
  160. var l log.Logger
  161. if l, err = log.GetLogger(c.LogFile); err == nil {
  162. g.mainlog.Store(l)
  163. g.mapServers(func(server *server) {
  164. server.mainlogStore.Store(l) // it will change to hl on the next accepted client
  165. })
  166. g.mainlog.Get().Infof("main log for new clients changed to to [%s]", c.LogFile)
  167. } else {
  168. g.mainlog.Get().WithError(err).Errorf("main logging change failed [%s]", c.LogFile)
  169. }
  170. })
  171. // re-open the main log file (file not changed)
  172. g.Subscribe("config_change:reopen_log_file", func(c *AppConfig) {
  173. g.mainlog.Get().Reopen()
  174. g.mainlog.Get().Infof("re-opened main log file [%s]", c.LogFile)
  175. })
  176. // when log level changes, apply to mainlog and server logs
  177. g.Subscribe("config_change:log_level", func(c *AppConfig) {
  178. g.mainlog.Get().SetLevel(c.LogLevel)
  179. g.mapServers(func(server *server) {
  180. server.log.SetLevel(c.LogLevel)
  181. })
  182. g.mainlog.Get().Infof("log level changed to [%s]", c.LogLevel)
  183. })
  184. // server config was updated
  185. g.Subscribe("server_change:update_config", func(sc *ServerConfig) {
  186. if i, _ := g.findServer(sc.ListenInterface); i != -1 {
  187. g.setConfig(i, sc)
  188. }
  189. })
  190. // add a new server to the config & start
  191. g.Subscribe("server_change:new_server", func(sc *ServerConfig) {
  192. if i, _ := g.findServer(sc.ListenInterface); i == -1 {
  193. // not found, lets add it
  194. g.addServer(sc)
  195. g.mainlog.Get().Infof("New server added [%s]", sc.ListenInterface)
  196. if g.state == GuerrillaStateStarted {
  197. err := g.Start()
  198. if err != nil {
  199. g.mainlog.Get().WithError(err).Info("Event server_change:new_server returned errors when starting")
  200. }
  201. }
  202. }
  203. })
  204. // start a server that already exists in the config and has been instantiated
  205. g.Subscribe("server_change:start_server", func(sc *ServerConfig) {
  206. if i, server := g.findServer(sc.ListenInterface); i != -1 {
  207. if server.state == ServerStateStopped || server.state == ServerStateNew {
  208. g.mainlog.Get().Infof("Starting server [%s]", server.listenInterface)
  209. err := g.Start()
  210. if err != nil {
  211. g.mainlog.Get().WithError(err).Info("Event server_change:start_server returned errors when starting")
  212. }
  213. }
  214. }
  215. })
  216. // stop running a server
  217. g.Subscribe("server_change:stop_server", func(sc *ServerConfig) {
  218. if i, server := g.findServer(sc.ListenInterface); i != -1 {
  219. if server.state == ServerStateRunning {
  220. server.Shutdown()
  221. g.mainlog.Get().Infof("Server [%s] stopped.", sc.ListenInterface)
  222. }
  223. }
  224. })
  225. // server was removed from config
  226. g.Subscribe("server_change:remove_server", func(sc *ServerConfig) {
  227. if i, server := g.findServer(sc.ListenInterface); i != -1 {
  228. server.Shutdown()
  229. g.removeServer(i, sc.ListenInterface)
  230. g.mainlog.Get().Infof("Server [%s] removed from config, stopped it.", sc.ListenInterface)
  231. }
  232. })
  233. // TLS changes
  234. g.Subscribe("server_change:tls_config", func(sc *ServerConfig) {
  235. if i, server := g.findServer(sc.ListenInterface); i != -1 {
  236. if err := server.configureSSL(); err == nil {
  237. g.mainlog.Get().Infof("Server [%s] new TLS configuration loaded", sc.ListenInterface)
  238. } else {
  239. g.mainlog.Get().WithError(err).Errorf("Server [%s] failed to load the new TLS configuration", sc.ListenInterface)
  240. }
  241. }
  242. })
  243. // when server's timeout change.
  244. g.Subscribe("server_change:timeout", func(sc *ServerConfig) {
  245. g.mapServers(func(server *server) {
  246. server.setTimeout(sc.Timeout)
  247. })
  248. })
  249. // when server's max clients change.
  250. g.Subscribe("server_change:max_clients", func(sc *ServerConfig) {
  251. g.mapServers(func(server *server) {
  252. // TODO resize the pool somehow
  253. })
  254. })
  255. // when a server's log file changes
  256. g.Subscribe("server_change:new_log_file", func(sc *ServerConfig) {
  257. if i, server := g.findServer(sc.ListenInterface); i != -1 {
  258. var err error
  259. var l log.Logger
  260. if l, err = log.GetLogger(sc.LogFile); err == nil {
  261. g.mainlog.Store(l)
  262. server.logStore.Store(l) // it will change to l on the next accepted client
  263. g.mainlog.Get().Infof("Server [%s] changed, new clients will log to: [%s]",
  264. sc.ListenInterface,
  265. sc.LogFile,
  266. )
  267. } else {
  268. g.mainlog.Get().WithError(err).Errorf(
  269. "Server [%s] log change failed to: [%s]",
  270. sc.ListenInterface,
  271. sc.LogFile,
  272. )
  273. }
  274. }
  275. })
  276. // when the daemon caught a sighup
  277. g.Subscribe("server_change:reopen_log_file", func(sc *ServerConfig) {
  278. if i, server := g.findServer(sc.ListenInterface); i != -1 {
  279. server.log.Reopen()
  280. g.mainlog.Get().Infof("Server [%s] re-opened log file [%s]", sc.ListenInterface, sc.LogFile)
  281. }
  282. })
  283. }
  284. // Entry point for the application. Starts all servers.
  285. func (g *guerrilla) Start() error {
  286. var startErrors Errors
  287. g.guard.Lock()
  288. defer func() {
  289. g.state = GuerrillaStateStarted
  290. g.guard.Unlock()
  291. }()
  292. if len(g.servers) == 0 {
  293. return append(startErrors, errors.New("No servers to start, please check the config"))
  294. }
  295. // channel for reading errors
  296. errs := make(chan error, len(g.servers))
  297. var startWG sync.WaitGroup
  298. // start servers, send any errors back to errs channel
  299. for ListenInterface := range g.servers {
  300. if !g.servers[ListenInterface].isEnabled() {
  301. // not enabled
  302. continue
  303. }
  304. if g.servers[ListenInterface].state != ServerStateNew &&
  305. g.servers[ListenInterface].state != ServerStateStopped {
  306. continue
  307. }
  308. startWG.Add(1)
  309. go func(s *server) {
  310. if err := s.Start(&startWG); err != nil {
  311. errs <- err
  312. }
  313. }(g.servers[ListenInterface])
  314. }
  315. // wait for all servers to start (or fail)
  316. startWG.Wait()
  317. // close, then read any errors
  318. close(errs)
  319. for err := range errs {
  320. if err != nil {
  321. startErrors = append(startErrors, err)
  322. }
  323. }
  324. if len(startErrors) > 0 {
  325. return startErrors
  326. } else {
  327. if gw, ok := g.backend.(*backends.BackendGateway); ok {
  328. if gw.State == backends.BackendStateShuttered {
  329. _ = gw.Reinitialize()
  330. }
  331. }
  332. }
  333. return nil
  334. }
  335. func (g *guerrilla) Shutdown() {
  336. g.guard.Lock()
  337. defer func() {
  338. g.state = GuerrillaStateStopped
  339. defer g.guard.Unlock()
  340. }()
  341. for ListenInterface, s := range g.servers {
  342. if s.state == ServerStateRunning {
  343. s.Shutdown()
  344. g.mainlog.Get().Infof("shutdown completed for [%s]", ListenInterface)
  345. }
  346. }
  347. if err := g.backend.Shutdown(); err != nil {
  348. g.mainlog.Get().WithError(err).Warn("Backend failed to shutdown")
  349. } else {
  350. g.mainlog.Get().Infof("Backend shutdown completed")
  351. }
  352. }
  353. func (g *guerrilla) Subscribe(topic string, fn interface{}) error {
  354. return g.bus.Subscribe(topic, fn)
  355. }
  356. func (g *guerrilla) Publish(topic string, args ...interface{}) {
  357. g.bus.Publish(topic, args...)
  358. }
  359. func (g *guerrilla) Unsubscribe(topic string, handler interface{}) error {
  360. return g.bus.Unsubscribe(topic, handler)
  361. }
  362. func (g *guerrilla) SetLogger(l log.Logger) {
  363. g.mainlog.Store(l)
  364. }