guerrilla_db_redis.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. "time"
  6. log "github.com/Sirupsen/logrus"
  7. "github.com/garyburd/redigo/redis"
  8. "github.com/flashmob/go-guerrilla/envelope"
  9. "github.com/ziutek/mymysql/autorc"
  10. _ "github.com/ziutek/mymysql/godrv"
  11. )
  12. func init() {
  13. backends["guerrilla-db-redis"] = &AbstractBackend{
  14. extend: &GuerrillaDBAndRedisBackend{}}
  15. }
  16. type GuerrillaDBAndRedisBackend struct {
  17. AbstractBackend
  18. config guerrillaDBAndRedisConfig
  19. }
  20. type guerrillaDBAndRedisConfig struct {
  21. NumberOfWorkers int `json:"save_workers_size"`
  22. MysqlTable string `json:"mail_table"`
  23. MysqlDB string `json:"mysql_db"`
  24. MysqlHost string `json:"mysql_host"`
  25. MysqlPass string `json:"mysql_pass"`
  26. MysqlUser string `json:"mysql_user"`
  27. RedisExpireSeconds int `json:"redis_expire_seconds"`
  28. RedisInterface string `json:"redis_interface"`
  29. PrimaryHost string `json:"primary_mail_host"`
  30. }
  31. func convertError(name string) error {
  32. return fmt.Errorf("failed to load backend config (%s)", name)
  33. }
  34. // Load the backend config for the backend. It has already been unmarshalled
  35. // from the main config file 'backend' config "backend_config"
  36. // Now we need to convert each type and copy into the guerrillaDBAndRedisConfig struct
  37. func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig BackendConfig) (err error) {
  38. configType := baseConfig(&guerrillaDBAndRedisConfig{})
  39. bcfg, err := g.extractConfig(backendConfig, configType)
  40. if err != nil {
  41. return err
  42. }
  43. m := bcfg.(*guerrillaDBAndRedisConfig)
  44. g.config = *m
  45. return nil
  46. }
  47. func (g *GuerrillaDBAndRedisBackend) getNumberOfWorkers() int {
  48. return g.config.NumberOfWorkers
  49. }
  50. func (g *GuerrillaDBAndRedisBackend) Process(mail *envelope.Envelope) BackendResult {
  51. to := mail.RcptTo
  52. log.Info("(g *GuerrillaDBAndRedisBackend) Process called")
  53. if len(to) == 0 {
  54. return NewBackendResult("554 Error: no recipient")
  55. }
  56. return nil
  57. }
  58. type redisClient struct {
  59. isConnected bool
  60. conn redis.Conn
  61. time int
  62. }
  63. func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePayload) {
  64. var to, body string
  65. var err error
  66. var redisErr error
  67. var length int
  68. redisClient := &redisClient{}
  69. db := autorc.New(
  70. "tcp",
  71. "",
  72. g.config.MysqlHost,
  73. g.config.MysqlUser,
  74. g.config.MysqlPass,
  75. g.config.MysqlDB)
  76. db.Register("set names utf8")
  77. sql := "INSERT INTO " + g.config.MysqlTable + " "
  78. sql += "(`date`, `to`, `from`, `subject`, `body`, `charset`, `mail`, `spam_score`, `hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, `return_path`, `is_tls`)"
  79. sql += " values (NOW(), ?, ?, ?, ? , 'UTF-8' , ?, 0, ?, '', ?, 0, ?, ?, ?)"
  80. ins, sqlErr := db.Prepare(sql)
  81. if sqlErr != nil {
  82. log.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
  83. }
  84. sql = "UPDATE gm2_setting SET `setting_value` = `setting_value`+1 WHERE `setting_name`='received_emails' LIMIT 1"
  85. incr, sqlErr := db.Prepare(sql)
  86. if sqlErr != nil {
  87. log.WithError(sqlErr).Fatalf("failed while db.Prepare(UPDATE...)")
  88. }
  89. defer func() {
  90. if r := recover(); r != nil {
  91. //recover form closed channel
  92. fmt.Println("Recovered in f", r)
  93. }
  94. if db.Raw != nil {
  95. db.Raw.Close()
  96. }
  97. if redisClient.conn != nil {
  98. log.Infof("closed redis")
  99. redisClient.conn.Close()
  100. }
  101. }()
  102. // receives values from the channel repeatedly until it is closed.
  103. for {
  104. payload := <-saveMailChan
  105. if payload == nil {
  106. log.Debug("No more saveMailChan payload")
  107. return
  108. }
  109. to = payload.recipient.User + "@" + g.config.PrimaryHost
  110. length = len(payload.mail.Data)
  111. ts := fmt.Sprintf("%d", time.Now().UnixNano())
  112. payload.mail.Subject = MimeHeaderDecode(payload.mail.Subject)
  113. hash := MD5Hex(
  114. to,
  115. payload.mail.MailFrom.String(),
  116. payload.mail.Subject,
  117. ts)
  118. // Add extra headers
  119. var addHead string
  120. addHead += "Delivered-To: " + to + "\r\n"
  121. addHead += "Received: from " + payload.mail.Helo + " (" + payload.mail.Helo + " [" + payload.mail.RemoteAddress + "])\r\n"
  122. addHead += " by " + payload.recipient.Host + " with SMTP id " + hash + "@" + payload.recipient.Host + ";\r\n"
  123. addHead += " " + time.Now().Format(time.RFC1123Z) + "\r\n"
  124. // compress to save space
  125. payload.mail.Data = Compress(addHead, payload.mail.Data)
  126. body = "gzencode"
  127. redisErr = redisClient.redisConnection(g.config.RedisInterface)
  128. if redisErr == nil {
  129. _, doErr := redisClient.conn.Do("SETEX", hash, g.config.RedisExpireSeconds, payload.mail.Data)
  130. if doErr == nil {
  131. payload.mail.Data = ""
  132. body = "redis"
  133. }
  134. } else {
  135. log.WithError(redisErr).Warn("Error while SETEX on redis")
  136. }
  137. // bind data to cursor
  138. ins.Bind(
  139. to,
  140. payload.mail.MailFrom.String(),
  141. payload.mail.Subject,
  142. body,
  143. payload.mail.Data,
  144. hash,
  145. to,
  146. payload.mail.RemoteAddress,
  147. payload.mail.MailFrom.String(),
  148. payload.mail.TLS,
  149. )
  150. // save, discard result
  151. _, _, err = ins.Exec()
  152. if err != nil {
  153. errMsg := "Database error while inserting"
  154. log.WithError(err).Warn(errMsg)
  155. payload.savedNotify <- &saveStatus{errors.New(errMsg), hash}
  156. } else {
  157. log.Debugf("Email saved %s (len=%d)", hash, length)
  158. _, _, err = incr.Exec()
  159. if err != nil {
  160. log.WithError(err).Warn("Database error while incr count")
  161. }
  162. payload.savedNotify <- &saveStatus{nil, hash}
  163. }
  164. }
  165. }
  166. func (c *redisClient) redisConnection(redisInterface string) (err error) {
  167. if c.isConnected == false {
  168. c.conn, err = redis.Dial("tcp", redisInterface)
  169. if err != nil {
  170. // handle error
  171. return err
  172. }
  173. c.isConnected = true
  174. }
  175. return nil
  176. }
  177. // test database connection settings
  178. func (g *GuerrillaDBAndRedisBackend) testSettings() (err error) {
  179. db := autorc.New(
  180. "tcp",
  181. "",
  182. g.config.MysqlHost,
  183. g.config.MysqlUser,
  184. g.config.MysqlPass,
  185. g.config.MysqlDB)
  186. if mysqlErr := db.Raw.Connect(); mysqlErr != nil {
  187. err = fmt.Errorf("MySql cannot connect, check your settings: %s", mysqlErr)
  188. } else {
  189. db.Raw.Close()
  190. }
  191. redisClient := &redisClient{}
  192. if redisErr := redisClient.redisConnection(g.config.RedisInterface); redisErr != nil {
  193. err = fmt.Errorf("Redis cannot connect, check your settings: %s", redisErr)
  194. }
  195. return
  196. }