proxy.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/flashmob/go-guerrilla/envelope"
  6. "sync"
  7. )
  8. // The ProxyBackend makes it possible to use the old backend system
  9. // which is not using processors
  10. type ProxyBackend struct {
  11. config proxyConfig
  12. extend proxy
  13. saveMailChan chan *savePayload
  14. State backendState
  15. // waits for backend workers to start/stop
  16. wg sync.WaitGroup
  17. }
  18. type savePayload struct {
  19. mail *envelope.Envelope
  20. from *envelope.EmailAddress
  21. recipient *envelope.EmailAddress
  22. savedNotify chan *saveStatus
  23. }
  24. type saveStatus struct {
  25. err error
  26. hash string
  27. }
  28. type proxy interface {
  29. // Backend
  30. /*
  31. saveMailWorker
  32. numberOfWorkersGetter
  33. settingsTester
  34. configLoader
  35. */
  36. }
  37. type saveMailWorker interface {
  38. // start save mail worker(s)
  39. saveMailWorker(chan *savePayload)
  40. }
  41. type numberOfWorkersGetter interface {
  42. // get the number of workers that will be stared
  43. getNumberOfWorkers() int
  44. }
  45. type settingsTester interface {
  46. // test database settings, permissions, correct paths, etc, before starting workers
  47. testSettings() error
  48. }
  49. type configLoader interface {
  50. // parse the configuration files
  51. loadConfig(BackendConfig) error
  52. }
  53. type proxyConfig struct {
  54. LogReceivedMails bool `json:"log_received_mails"`
  55. }
  56. // Your backend should implement this method and set b.config field with a custom config struct
  57. // Therefore, your implementation would have your own custom config type instead of dummyConfig
  58. func (pb *ProxyBackend) loadConfig(backendConfig BackendConfig) (err error) {
  59. // Load the backend config for the backend. It has already been unmarshalled
  60. // from the main config file 'backend' config "backend_config"
  61. // Now we need to convert each type and copy into the dummyConfig struct
  62. configType := BaseConfig(&proxyConfig{})
  63. bcfg, err := Svc.ExtractConfig(backendConfig, configType)
  64. if err != nil {
  65. return err
  66. }
  67. m := bcfg.(*proxyConfig)
  68. pb.config = *m
  69. return nil
  70. }
  71. func (pb *ProxyBackend) initialize(config BackendConfig) error {
  72. if cl, ok := pb.extend.(configLoader); ok {
  73. return cl.loadConfig(config)
  74. }
  75. err := pb.loadConfig(config)
  76. if err != nil {
  77. return err
  78. }
  79. return nil
  80. }
  81. func (pb *ProxyBackend) Initialize(cfg BackendConfig) error {
  82. err := pb.initialize(cfg)
  83. if err == nil {
  84. workersSize := pb.getNumberOfWorkers()
  85. if workersSize < 1 {
  86. pb.State = BackendStateError
  87. return errors.New("Must have at least 1 worker")
  88. }
  89. if err := pb.testSettings(); err != nil {
  90. pb.State = BackendStateError
  91. return err
  92. }
  93. pb.saveMailChan = make(chan *savePayload, workersSize)
  94. // start our savemail workers
  95. pb.wg.Add(workersSize)
  96. for i := 0; i < workersSize; i++ {
  97. go func() {
  98. pb.saveMailWorker(pb.saveMailChan)
  99. pb.wg.Done()
  100. }()
  101. }
  102. } else {
  103. pb.State = BackendStateError
  104. }
  105. return err
  106. }
  107. func (pb *ProxyBackend) Shutdown() error {
  108. if b, ok := pb.extend.(Backend); ok {
  109. return b.Shutdown()
  110. }
  111. return nil
  112. }
  113. func (pb *ProxyBackend) ValidateRcpt(mail *envelope.Envelope) RcptError {
  114. if b, ok := pb.extend.(Backend); ok {
  115. return b.ValidateRcpt(mail)
  116. }
  117. return nil
  118. }
  119. func (pb *ProxyBackend) Process(mail *envelope.Envelope) Result {
  120. if b, ok := pb.extend.(Backend); ok {
  121. return b.Process(mail)
  122. }
  123. mail.ParseHeaders()
  124. if pb.config.LogReceivedMails {
  125. Log().Infof("Mail from: %s / to: %v", mail.MailFrom.String(), mail.RcptTo)
  126. Log().Info("Headers are: %s", mail.Header)
  127. }
  128. return NewResult("250 OK")
  129. }
  130. func (pb *ProxyBackend) saveMailWorker(saveMailChan chan *savePayload) {
  131. if s, ok := pb.extend.(saveMailWorker); ok {
  132. s.saveMailWorker(saveMailChan)
  133. return
  134. }
  135. defer func() {
  136. if r := recover(); r != nil {
  137. // recover form closed channel
  138. fmt.Println("Recovered in f", r)
  139. }
  140. // close any connections / files
  141. // ...
  142. }()
  143. for {
  144. payload := <-saveMailChan
  145. if payload == nil {
  146. Log().Debug("No more saveMailChan payload")
  147. return
  148. }
  149. // process the email here
  150. result := pb.Process(payload.mail)
  151. // if all good
  152. if result.Code() < 300 {
  153. payload.savedNotify <- &saveStatus{nil, "s0m3l337Ha5hva1u3LOL"}
  154. } else {
  155. payload.savedNotify <- &saveStatus{errors.New(result.String()), "s0m3l337Ha5hva1u3LOL"}
  156. }
  157. }
  158. }
  159. func (pb *ProxyBackend) getNumberOfWorkers() int {
  160. if n, ok := pb.extend.(numberOfWorkersGetter); ok {
  161. return n.getNumberOfWorkers()
  162. }
  163. return 1
  164. }
  165. func (b *ProxyBackend) testSettings() error {
  166. if t, ok := b.extend.(settingsTester); ok {
  167. return t.testSettings()
  168. }
  169. return nil
  170. }