save_mail.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/garyburd/redigo/redis"
  5. "github.com/ziutek/mymysql/autorc"
  6. _ "github.com/ziutek/mymysql/godrv"
  7. "log"
  8. "strconv"
  9. "time"
  10. "errors"
  11. )
  12. type savePayload struct {
  13. client *Client
  14. server *SmtpdServer
  15. }
  16. var SaveMailChan chan *savePayload // workers for saving mail
  17. type redisClient struct {
  18. count int
  19. conn redis.Conn
  20. time int
  21. }
  22. func saveMail() {
  23. var to, recipient, body string
  24. var err error
  25. var redis_err error
  26. var length int
  27. redisClient := &redisClient{}
  28. db := autorc.New(
  29. "tcp",
  30. "",
  31. mainConfig.Mysql_host,
  32. mainConfig.Mysql_user,
  33. mainConfig.Mysql_pass,
  34. mainConfig.Mysql_db)
  35. db.Register("set names utf8")
  36. sql := "INSERT INTO " + mainConfig.Mysql_table + " "
  37. sql += "(`date`, `to`, `from`, `subject`, `body`, `charset`, `mail`, `spam_score`, `hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, `return_path`, `is_tls`)"
  38. sql += " values (NOW(), ?, ?, ?, ? , 'UTF-8' , ?, 0, ?, '', ?, 0, ?, ?, ?)"
  39. ins, sql_err := db.Prepare(sql)
  40. if sql_err != nil {
  41. log.Fatalf(fmt.Sprintf("Sql statement incorrect: %s\n", sql_err))
  42. }
  43. sql = "UPDATE gm2_setting SET `setting_value` = `setting_value`+1 WHERE `setting_name`='received_emails' LIMIT 1"
  44. incr, sql_err := db.Prepare(sql)
  45. if sql_err != nil {
  46. log.Fatalf(fmt.Sprintf("Sql statement incorrect: %s\n", sql_err))
  47. }
  48. // receives values from the channel repeatedly until it is closed.
  49. for {
  50. payload := <-SaveMailChan
  51. if user, host, addr_err := validateEmailData(payload.client); addr_err != nil {
  52. payload.server.logln(1, fmt.Sprintf("mail_from didnt validate: %v", addr_err)+" client.mail_from:"+payload.client.mail_from)
  53. // notify client that a save completed, -1 = error
  54. payload.client.savedNotify <- -1
  55. continue
  56. } else {
  57. recipient = user + "@" + host
  58. to = user + "@" + mainConfig.Primary_host
  59. }
  60. length = len(payload.client.data)
  61. ts := strconv.FormatInt(time.Now().UnixNano(), 10);
  62. payload.client.subject = mimeHeaderDecode(payload.client.subject)
  63. payload.client.hash = md5hex(
  64. &to,
  65. &payload.client.mail_from,
  66. &payload.client.subject,
  67. &ts)
  68. // Add extra headers
  69. add_head := ""
  70. add_head += "Delivered-To: " + to + "\r\n"
  71. add_head += "Received: from " + payload.client.helo + " (" + payload.client.helo + " [" + payload.client.address + "])\r\n"
  72. add_head += " by " + payload.server.Config.Host_name + " with SMTP id " + payload.client.hash + "@" +
  73. payload.server.Config.Host_name + ";\r\n"
  74. add_head += " " + time.Now().Format(time.RFC1123Z) + "\r\n"
  75. // compress to save space
  76. payload.client.data = compress(&add_head, &payload.client.data)
  77. body = "gzencode"
  78. redis_err = redisClient.redisConnection()
  79. if redis_err == nil {
  80. _, do_err := redisClient.conn.Do("SETEX", payload.client.hash, mainConfig.Redis_expire_seconds, payload.client.data)
  81. if do_err == nil {
  82. payload.client.data = ""
  83. body = "redis"
  84. }
  85. } else {
  86. payload.server.logln(1, fmt.Sprintf("redis: %v", redis_err))
  87. }
  88. // bind data to cursor
  89. ins.Bind(
  90. to,
  91. payload.client.mail_from,
  92. payload.client.subject,
  93. body,
  94. payload.client.data,
  95. payload.client.hash,
  96. recipient,
  97. payload.client.address,
  98. payload.client.mail_from,
  99. payload.client.tls_on,
  100. )
  101. // save, discard result
  102. _, _, err = ins.Exec()
  103. if err != nil {
  104. payload.server.logln(1, fmt.Sprintf("Database error, %v ", err))
  105. payload.client.savedNotify <- -1
  106. } else {
  107. payload.server.logln(0, "Email saved "+payload.client.hash+" len:"+strconv.Itoa(length))
  108. _, _, err = incr.Exec()
  109. if err != nil {
  110. payload.server.logln(1, fmt.Sprintf("Failed to incr count: %v", err))
  111. }
  112. payload.client.savedNotify <- 1
  113. }
  114. }
  115. }
  116. func (c *redisClient) redisConnection() (err error) {
  117. if c.count == 0 {
  118. c.conn, err = redis.Dial("tcp", mainConfig.Redis_interface)
  119. if err != nil {
  120. // handle error
  121. return err
  122. }
  123. }
  124. return nil
  125. }
  126. // test database connection settings
  127. func testDbConnections() (err error) {
  128. db := autorc.New(
  129. "tcp",
  130. "",
  131. mainConfig.Mysql_host,
  132. mainConfig.Mysql_user,
  133. mainConfig.Mysql_pass,
  134. mainConfig.Mysql_db)
  135. if mysql_err := db.Raw.Connect(); mysql_err != nil {
  136. err = errors.New("MySql cannot connect, check your settings. " + mysql_err.Error() )
  137. } else {
  138. db.Raw.Close();
  139. }
  140. redisClient := &redisClient{}
  141. if redis_err := redisClient.redisConnection(); redis_err != nil {
  142. err = errors.New("Redis cannot connect, check your settings. " + redis_err.Error())
  143. }
  144. return
  145. }