guerrilla.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. package guerrilla
  2. import (
  3. "errors"
  4. log "github.com/Sirupsen/logrus"
  5. evbus "github.com/asaskevich/EventBus"
  6. "github.com/flashmob/go-guerrilla/backends"
  7. "sync"
  8. )
  9. const (
  10. // server has just been created
  11. GuerrillaStateNew = iota
  12. // Server has been started and is running
  13. GuerrillaStateStarted
  14. // Server has just been stopped
  15. GuerrillaStateStopped
  16. )
  17. type Errors []error
  18. // implement the Error interface
  19. func (e Errors) Error() string {
  20. if len(e) == 1 {
  21. return e[0].Error()
  22. }
  23. // multiple errors
  24. msg := ""
  25. for _, err := range e {
  26. msg += "\n" + err.Error()
  27. }
  28. return msg
  29. }
  30. type Guerrilla interface {
  31. Start() error
  32. Shutdown()
  33. Subscribe(topic string, fn interface{}) error
  34. Publish(topic string, args ...interface{})
  35. Unsubscribe(topic string, handler interface{}) error
  36. }
  37. type guerrilla struct {
  38. Config AppConfig
  39. servers map[string]*server
  40. backend backends.Backend
  41. // guard controls access to g.servers
  42. guard sync.Mutex
  43. state int8
  44. bus *evbus.EventBus
  45. }
  46. // Returns a new instance of Guerrilla with the given config, not yet running.
  47. func New(ac *AppConfig, b backends.Backend) (Guerrilla, error) {
  48. g := &guerrilla{
  49. Config: *ac, // take a local copy
  50. servers: make(map[string]*server, len(ac.Servers)),
  51. backend: b,
  52. bus: evbus.New(),
  53. }
  54. g.state = GuerrillaStateNew
  55. err := g.makeServers()
  56. // subscribe for any events that may come in while running
  57. g.subscribeEvents()
  58. return g, err
  59. }
  60. // Instantiate servers
  61. func (g *guerrilla) makeServers() error {
  62. log.Debug("making servers")
  63. var errs Errors
  64. for _, sc := range g.Config.Servers {
  65. if _, ok := g.servers[sc.ListenInterface]; ok {
  66. // server already instantiated
  67. continue
  68. }
  69. server, err := newServer(&sc, g.backend)
  70. if err != nil {
  71. log.WithError(err).Errorf("Failed to create server [%s]", sc.ListenInterface)
  72. errs = append(errs, err)
  73. }
  74. if server != nil {
  75. g.servers[sc.ListenInterface] = server
  76. server.setAllowedHosts(g.Config.AllowedHosts)
  77. }
  78. }
  79. if len(g.servers) == 0 {
  80. errs = append(errs, errors.New("There are no servers that can start, please check your config"))
  81. }
  82. if len(errs) == 0 {
  83. return nil
  84. }
  85. return errs
  86. }
  87. // find a server by interface, retuning the index of the config and instance of server
  88. func (g *guerrilla) findServer(iface string) (int, *server) {
  89. g.guard.Lock()
  90. defer g.guard.Unlock()
  91. ret := -1
  92. for i := range g.Config.Servers {
  93. if g.Config.Servers[i].ListenInterface == iface {
  94. server := g.servers[iface]
  95. ret = i
  96. return ret, server
  97. }
  98. }
  99. return ret, nil
  100. }
  101. func (g *guerrilla) removeServer(serverConfigIndex int, iface string) {
  102. g.guard.Lock()
  103. defer g.guard.Unlock()
  104. delete(g.servers, iface)
  105. // cut out from the slice
  106. g.Config.Servers = append(g.Config.Servers[:serverConfigIndex], g.Config.Servers[1:]...)
  107. }
  108. func (g *guerrilla) addServer(sc *ServerConfig) {
  109. g.guard.Lock()
  110. defer g.guard.Unlock()
  111. g.Config.Servers = append(g.Config.Servers, *sc)
  112. g.makeServers()
  113. }
  114. func (g *guerrilla) setConfig(i int, sc *ServerConfig) {
  115. g.guard.Lock()
  116. defer g.guard.Unlock()
  117. g.Config.Servers[i] = *sc
  118. g.servers[sc.ListenInterface].setConfig(sc)
  119. }
  120. // mapServers calls a callback on each server in g.servers map
  121. // It locks the g.servers map before mapping
  122. func (g *guerrilla) mapServers(callback func(*server)) map[string]*server {
  123. defer g.guard.Unlock()
  124. g.guard.Lock()
  125. for _, server := range g.servers {
  126. callback(server)
  127. }
  128. return g.servers
  129. }
  130. func (g *guerrilla) subscribeEvents() {
  131. // allowed_hosts changed, set for all servers
  132. g.Subscribe("config_change:allowed_hosts", func(c *AppConfig) {
  133. g.mapServers(func(server *server) {
  134. server.setAllowedHosts(c.AllowedHosts)
  135. })
  136. log.Infof("allowed_hosts config changed, a new list was set")
  137. })
  138. // server was removed from config
  139. g.Subscribe("server_change:update_config", func(sc *ServerConfig) {
  140. if i, _ := g.findServer(sc.ListenInterface); i != -1 {
  141. g.setConfig(i, sc)
  142. }
  143. })
  144. // add a new server to the config & start
  145. g.Subscribe("server_change:new_server", func(sc *ServerConfig) {
  146. if i, _ := g.findServer(sc.ListenInterface); i == -1 {
  147. // not found, lets add it
  148. g.addServer(sc)
  149. log.Infof("New server added [%s]", sc.ListenInterface)
  150. if g.state == GuerrillaStateStarted {
  151. err := g.Start()
  152. if err != nil {
  153. log.WithError(err).Info("Event server_change:new_server returned errors when starting")
  154. }
  155. }
  156. }
  157. })
  158. // start a server that already exists in config and has been instantiated
  159. g.Subscribe("server_change:start_server", func(sc *ServerConfig) {
  160. if i, server := g.findServer(sc.ListenInterface); i != -1 {
  161. if server.state == ServerStateStopped || server.state == ServerStateNew {
  162. log.Infof("Starting server [%s]", server.listenInterface)
  163. err := g.Start()
  164. if err != nil {
  165. log.WithError(err).Info("Event server_change:start_server returned errors when starting")
  166. }
  167. }
  168. }
  169. })
  170. // stop running a server
  171. g.Subscribe("server_change:stop_server", func(sc *ServerConfig) {
  172. if i, server := g.findServer(sc.ListenInterface); i != -1 {
  173. if server.state == ServerStateRunning {
  174. server.Shutdown()
  175. log.Infof("Server [%s] stopped.", sc.ListenInterface)
  176. }
  177. }
  178. })
  179. // server was removed from config
  180. g.Subscribe("server_change:remove_server", func(sc *ServerConfig) {
  181. if i, server := g.findServer(sc.ListenInterface); i != -1 {
  182. server.Shutdown()
  183. g.removeServer(i, sc.ListenInterface)
  184. log.Infof("Server [%s] removed from config, stopped it.", sc.ListenInterface)
  185. }
  186. })
  187. // TLS changes
  188. g.Subscribe("server_change:tls_config", func(sc *ServerConfig) {
  189. if i, server := g.findServer(sc.ListenInterface); i != -1 {
  190. if err := server.configureSSL(); err == nil {
  191. log.Infof("Server [%s] new TLS configuration loaded", sc.ListenInterface)
  192. } else {
  193. log.WithError(err).Errorf("Server [%s] failed to load the new TLS configuration", sc.ListenInterface)
  194. }
  195. }
  196. })
  197. // when server's timeout change.
  198. g.Subscribe("server_change:timeout", func(sc *ServerConfig) {
  199. g.mapServers(func(server *server) {
  200. server.setTimeout(sc.Timeout)
  201. })
  202. })
  203. // when server's max clients change.
  204. g.Subscribe("server_change:max_clients", func(sc *ServerConfig) {
  205. g.mapServers(func(server *server) {
  206. // TODO resize the pool somehow
  207. })
  208. })
  209. }
  210. // Entry point for the application. Starts all servers.
  211. func (g *guerrilla) Start() error {
  212. var startErrors Errors
  213. g.guard.Lock()
  214. defer func() {
  215. g.state = GuerrillaStateStarted
  216. g.guard.Unlock()
  217. }()
  218. if len(g.servers) == 0 {
  219. return append(startErrors, errors.New("No servers to start, please check the config"))
  220. }
  221. // channel for reading errors
  222. errs := make(chan error, len(g.servers))
  223. var startWG sync.WaitGroup
  224. // start servers, send any errors back to errs channel
  225. for ListenInterface := range g.servers {
  226. if !g.servers[ListenInterface].isEnabled() {
  227. // not enabled
  228. continue
  229. }
  230. if g.servers[ListenInterface].state != ServerStateNew &&
  231. g.servers[ListenInterface].state != ServerStateStopped {
  232. continue
  233. }
  234. startWG.Add(1)
  235. go func(s *server) {
  236. if err := s.Start(&startWG); err != nil {
  237. errs <- err
  238. }
  239. }(g.servers[ListenInterface])
  240. }
  241. // wait for all servers to start (or fail)
  242. startWG.Wait()
  243. // close, then read any errors
  244. close(errs)
  245. for err := range errs {
  246. if err != nil {
  247. startErrors = append(startErrors, err)
  248. }
  249. }
  250. if len(startErrors) > 0 {
  251. return startErrors
  252. } else {
  253. if gw, ok := g.backend.(*backends.BackendGateway); ok {
  254. if gw.State == backends.BackendStateShuttered {
  255. _ = gw.Reinitialize()
  256. }
  257. }
  258. }
  259. return nil
  260. }
  261. func (g *guerrilla) Shutdown() {
  262. g.guard.Lock()
  263. defer func() {
  264. g.state = GuerrillaStateStopped
  265. defer g.guard.Unlock()
  266. }()
  267. for ListenInterface, s := range g.servers {
  268. if s.state == ServerStateRunning {
  269. s.Shutdown()
  270. log.Infof("shutdown completed for [%s]", ListenInterface)
  271. }
  272. }
  273. if err := g.backend.Shutdown(); err != nil {
  274. log.WithError(err).Warn("Backend failed to shutdown")
  275. } else {
  276. log.Infof("Backend shutdown completed")
  277. }
  278. }
  279. func (g *guerrilla) Subscribe(topic string, fn interface{}) error {
  280. return g.bus.Subscribe(topic, fn)
  281. }
  282. func (g *guerrilla) Publish(topic string, args ...interface{}) {
  283. g.bus.Publish(topic, args...)
  284. }
  285. func (g *guerrilla) Unsubscribe(topic string, handler interface{}) error {
  286. return g.bus.Unsubscribe(topic, handler)
  287. }