gateway.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "github.com/flashmob/go-guerrilla/mail"
  9. "github.com/flashmob/go-guerrilla/response"
  10. "strings"
  11. )
  12. // A backend gateway is a proxy that implements the Backend interface.
  13. // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
  14. // via a channel. Shutting down via Shutdown() will stop all workers.
  15. // The rest of this program always talks to the backend via this gateway.
  16. type BackendGateway struct {
  17. // channel for distributing envelopes to workers
  18. conveyor chan *workerMsg
  19. // waits for backend workers to start/stop
  20. wg sync.WaitGroup
  21. w *Worker
  22. // controls access to state
  23. sync.Mutex
  24. State backendState
  25. config BackendConfig
  26. gwConfig *GatewayConfig
  27. }
  28. type GatewayConfig struct {
  29. WorkersSize int `json:"save_workers_size,omitempty"`
  30. ProcessorStack string `json:"process_stack,omitempty"`
  31. }
  32. // workerMsg is what get placed on the BackendGateway.saveMailChan channel
  33. type workerMsg struct {
  34. // The email data
  35. e *mail.Envelope
  36. // savedNotify is used to notify that the save operation completed
  37. notifyMe chan *notifyMsg
  38. // select the task type
  39. task SelectTask
  40. }
  41. // possible values for state
  42. const (
  43. BackendStateRunning = iota
  44. BackendStateShuttered
  45. BackendStateError
  46. processTimeout = time.Second * 30
  47. defaultProcessor = "Debugger"
  48. )
  49. type backendState int
  50. func (s backendState) String() string {
  51. return strconv.Itoa(int(s))
  52. }
  53. // Process distributes an envelope to one of the backend workers
  54. func (gw *BackendGateway) Process(e *mail.Envelope) Result {
  55. if gw.State != BackendStateRunning {
  56. return NewResult(response.Canned.FailBackendNotRunning + gw.State.String())
  57. }
  58. // place on the channel so that one of the save mail workers can pick it up
  59. savedNotify := make(chan *notifyMsg)
  60. gw.conveyor <- &workerMsg{e, savedNotify, TaskSaveMail}
  61. // wait for the save to complete
  62. // or timeout
  63. select {
  64. case status := <-savedNotify:
  65. if status.err != nil {
  66. return NewResult(response.Canned.FailBackendTransaction + status.err.Error())
  67. }
  68. return NewResult(response.Canned.SuccessMessageQueued + status.queuedID)
  69. case <-time.After(processTimeout):
  70. Log().Infof("Backend has timed out")
  71. return NewResult(response.Canned.FailBackendTimeout)
  72. }
  73. }
  74. // ValidateRcpt asks one of the workers to validate the recipient
  75. // Only the last recipient appended to e.RcptTo will be validated.
  76. func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
  77. if gw.State != BackendStateRunning {
  78. return StorageNotAvailable
  79. }
  80. // place on the channel so that one of the save mail workers can pick it up
  81. notify := make(chan *notifyMsg)
  82. gw.conveyor <- &workerMsg{e, notify, TaskValidateRcpt}
  83. // wait for the validation to complete
  84. // or timeout
  85. select {
  86. case status := <-notify:
  87. if status.err != nil {
  88. return status.err
  89. }
  90. return nil
  91. case <-time.After(time.Second):
  92. Log().Infof("Backend has timed out")
  93. return StorageTimeout
  94. }
  95. }
  96. // Shutdown shuts down the backend and leaves it in BackendStateShuttered state
  97. func (gw *BackendGateway) Shutdown() error {
  98. gw.Lock()
  99. defer gw.Unlock()
  100. if gw.State != BackendStateShuttered {
  101. close(gw.conveyor) // workers will stop
  102. // wait for workers to stop
  103. gw.wg.Wait()
  104. Svc.shutdown()
  105. gw.State = BackendStateShuttered
  106. }
  107. return nil
  108. }
  109. // Reinitialize starts up a backend gateway that was shutdown before
  110. func (gw *BackendGateway) Reinitialize() error {
  111. if gw.State != BackendStateShuttered {
  112. return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
  113. }
  114. err := gw.Initialize(gw.config)
  115. if err != nil {
  116. return fmt.Errorf("error while initializing the backend: %s", err)
  117. }
  118. gw.State = BackendStateRunning
  119. return err
  120. }
  121. // newProcessorLine creates a new call-stack of decorators and returns as a single Processor
  122. // Decorators are functions of Decorator type, source files prefixed with p_*
  123. // Each decorator does a specific task during the processing stage.
  124. // This function uses the config value process_stack to figure out which Decorator to use
  125. func (gw *BackendGateway) newProcessorLine() Processor {
  126. var decorators []Decorator
  127. cfg := strings.ToLower(strings.TrimSpace(gw.gwConfig.ProcessorStack))
  128. if len(cfg) == 0 {
  129. cfg = defaultProcessor
  130. }
  131. line := strings.Split(cfg, "|")
  132. for i := range line {
  133. name := line[len(line)-1-i] // reverse order, since decorators are stacked
  134. if makeFunc, ok := processors[name]; ok {
  135. decorators = append(decorators, makeFunc())
  136. }
  137. }
  138. // build the call-stack of decorators
  139. p := Decorate(DefaultProcessor{}, decorators...)
  140. return p
  141. }
  142. // loadConfig loads the config for the GatewayConfig
  143. func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
  144. configType := BaseConfig(&GatewayConfig{})
  145. // Note: treat config values as immutable
  146. // if you need to change a config value, change in the file then
  147. // send a SIGHUP
  148. bcfg, err := Svc.ExtractConfig(cfg, configType)
  149. if err != nil {
  150. return err
  151. }
  152. gw.gwConfig = bcfg.(*GatewayConfig)
  153. return nil
  154. }
  155. // Initialize builds the workers and starts each worker in a goroutine
  156. func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
  157. gw.Lock()
  158. defer gw.Unlock()
  159. err := gw.loadConfig(cfg)
  160. if err == nil {
  161. workersSize := gw.workersSize()
  162. if workersSize < 1 {
  163. gw.State = BackendStateError
  164. return errors.New("Must have at least 1 worker")
  165. }
  166. var lines []Processor
  167. for i := 0; i < workersSize; i++ {
  168. lines = append(lines, gw.newProcessorLine())
  169. }
  170. // initialize processors
  171. if err := Svc.initialize(cfg); err != nil {
  172. return err
  173. }
  174. gw.conveyor = make(chan *workerMsg, workersSize)
  175. // start our workers
  176. gw.wg.Add(workersSize)
  177. for i := 0; i < workersSize; i++ {
  178. go func(workerId int) {
  179. gw.w.workDispatcher(gw.conveyor, lines[workerId], workerId+1)
  180. gw.wg.Done()
  181. }(i)
  182. }
  183. } else {
  184. gw.State = BackendStateError
  185. }
  186. return err
  187. }
  188. // workersSize gets the number of workers to use for saving email by reading the save_workers_size config value
  189. // Returns 1 if no config value was set
  190. func (gw *BackendGateway) workersSize() int {
  191. if gw.gwConfig.WorkersSize == 0 {
  192. return 1
  193. }
  194. return gw.gwConfig.WorkersSize
  195. }