gateway.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "github.com/flashmob/go-guerrilla/envelope"
  9. "github.com/flashmob/go-guerrilla/log"
  10. "github.com/flashmob/go-guerrilla/response"
  11. )
  12. // A backend gateway is a proxy that implements the Backend interface.
  13. // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
  14. // via a channel. Shutting down via Shutdown() will stop all workers.
  15. // The rest of this program always talks to the backend via this gateway.
  16. type BackendGateway struct {
  17. AbstractBackend
  18. saveMailChan chan *savePayload
  19. // waits for backend workers to start/stop
  20. wg sync.WaitGroup
  21. b Worker
  22. // controls access to state
  23. stateGuard sync.Mutex
  24. State backendState
  25. config BackendConfig
  26. }
  27. // possible values for state
  28. const (
  29. BackendStateRunning = iota
  30. BackendStateShuttered
  31. BackendStateError
  32. )
  33. type backendState int
  34. func (s backendState) String() string {
  35. return strconv.Itoa(int(s))
  36. }
  37. // New retrieve a backend specified by the backendName, and initialize it using
  38. // backendConfig
  39. func New(backendName string, backendConfig BackendConfig, l log.Logger) (Backend, error) {
  40. backend, found := backends[backendName]
  41. mainlog = l
  42. if !found {
  43. return nil, fmt.Errorf("backend %q not found", backendName)
  44. }
  45. gateway := &BackendGateway{b: backend, config: backendConfig}
  46. err := gateway.Initialize(backendConfig)
  47. if err != nil {
  48. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  49. }
  50. gateway.State = BackendStateRunning
  51. return gateway, nil
  52. }
  53. // Process distributes an envelope to one of the backend workers
  54. func (gw *BackendGateway) Process(e *envelope.Envelope) BackendResult {
  55. if gw.State != BackendStateRunning {
  56. return NewBackendResult(response.Canned.FailBackendNotRunning + gw.State.String())
  57. }
  58. // place on the channel so that one of the save mail workers can pick it up
  59. savedNotify := make(chan *saveStatus)
  60. gw.saveMailChan <- &savePayload{e, savedNotify}
  61. // wait for the save to complete
  62. // or timeout
  63. select {
  64. case status := <-savedNotify:
  65. if status.err != nil {
  66. return NewBackendResult(response.Canned.FailBackendTransaction + status.err.Error())
  67. }
  68. return NewBackendResult(response.Canned.SuccessMessageQueued + status.hash)
  69. case <-time.After(time.Second * 30):
  70. mainlog.Infof("Backend has timed out")
  71. return NewBackendResult(response.Canned.FailBackendTimeout)
  72. }
  73. }
  74. func (gw *BackendGateway) Shutdown() error {
  75. gw.stateGuard.Lock()
  76. defer gw.stateGuard.Unlock()
  77. if gw.State != BackendStateShuttered {
  78. err := gw.b.Shutdown()
  79. if err == nil {
  80. close(gw.saveMailChan) // workers will stop
  81. gw.wg.Wait()
  82. gw.State = BackendStateShuttered
  83. }
  84. return err
  85. }
  86. return nil
  87. }
  88. // Reinitialize starts up a backend gateway that was shutdown before
  89. func (gw *BackendGateway) Reinitialize() error {
  90. if gw.State != BackendStateShuttered {
  91. return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
  92. }
  93. err := gw.Initialize(gw.config)
  94. if err != nil {
  95. return fmt.Errorf("error while initializing the backend: %s", err)
  96. }
  97. gw.State = BackendStateRunning
  98. return err
  99. }
  100. func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
  101. err := gw.b.Initialize(cfg)
  102. if err == nil {
  103. workersSize := gw.b.getNumberOfWorkers()
  104. if workersSize < 1 {
  105. gw.State = BackendStateError
  106. return errors.New("Must have at least 1 worker")
  107. }
  108. if err := gw.b.testSettings(); err != nil {
  109. gw.State = BackendStateError
  110. return err
  111. }
  112. gw.saveMailChan = make(chan *savePayload, workersSize)
  113. // start our savemail workers
  114. gw.wg.Add(workersSize)
  115. for i := 0; i < workersSize; i++ {
  116. go func() {
  117. gw.b.saveMailWorker(gw.saveMailChan)
  118. gw.wg.Done()
  119. }()
  120. }
  121. } else {
  122. gw.State = BackendStateError
  123. }
  124. return err
  125. }