envelope.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package mail
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/md5"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "mime"
  10. "net/mail"
  11. "net/textproto"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // A WordDecoder decodes MIME headers containing RFC 2047 encoded-words.
  17. // Used by the MimeHeaderDecode function.
  18. // It's exposed public so that an alternative decoder can be set, eg Gnu iconv
  19. // by importing the mail/inconv package.
  20. // Another alternative would be to use https://godoc.org/golang.org/x/text/encoding
  21. var Dec mime.WordDecoder
  22. func init() {
  23. // use the default decoder, without Gnu inconv. Import the mail/inconv package to use iconv.
  24. Dec = mime.WordDecoder{}
  25. }
  26. const maxHeaderChunk = 1 + (3 << 10) // 3KB
  27. // Address encodes an email address of the form `<user@host>`
  28. type Address struct {
  29. // User is local part
  30. User string
  31. // Host is the domain
  32. Host string
  33. // ADL is at-domain list if matched
  34. ADL []string
  35. // PathParams contains any ESTMP parameters that were matched
  36. PathParams [][]string
  37. // NullPath is true if <> was received
  38. NullPath bool
  39. }
  40. func (ep *Address) String() string {
  41. return fmt.Sprintf("%s@%s", ep.User, ep.Host)
  42. }
  43. func (ep *Address) IsEmpty() bool {
  44. return ep.User == "" && ep.Host == ""
  45. }
  46. var ap = mail.AddressParser{}
  47. // NewAddress takes a string of an RFC 5322 address of the
  48. // form "Gogh Fir <[email protected]>" or "[email protected]".
  49. func NewAddress(str string) (Address, error) {
  50. a, err := ap.Parse(str)
  51. if err != nil {
  52. return Address{}, err
  53. }
  54. pos := strings.Index(a.Address, "@")
  55. if pos > 0 {
  56. return Address{
  57. User: a.Address[0:pos],
  58. Host: a.Address[pos+1:],
  59. },
  60. nil
  61. }
  62. return Address{}, errors.New("invalid address")
  63. }
  64. // Email represents a single SMTP message.
  65. type Envelope struct {
  66. // Remote IP address
  67. RemoteIP string
  68. // Message sent in EHLO command
  69. Helo string
  70. // Sender
  71. MailFrom Address
  72. // Recipients
  73. RcptTo []Address
  74. // Data stores the header and message body
  75. Data bytes.Buffer
  76. // Subject stores the subject of the email, extracted and decoded after calling ParseHeaders()
  77. Subject string
  78. // TLS is true if the email was received using a TLS connection
  79. TLS bool
  80. // Header stores the results from ParseHeaders()
  81. Header textproto.MIMEHeader
  82. // Values hold the values generated when processing the envelope by the backend
  83. Values map[string]interface{}
  84. // Hashes of each email on the rcpt
  85. Hashes []string
  86. // additional delivery header that may be added
  87. DeliveryHeader string
  88. // Email(s) will be queued with this id
  89. QueuedId string
  90. // When locked, it means that the envelope is being processed by the backend
  91. sync.Mutex
  92. }
  93. func NewEnvelope(remoteAddr string, clientID uint64) *Envelope {
  94. return &Envelope{
  95. RemoteIP: remoteAddr,
  96. Values: make(map[string]interface{}),
  97. QueuedId: queuedID(clientID),
  98. }
  99. }
  100. func queuedID(clientID uint64) string {
  101. return fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID))))
  102. }
  103. // ParseHeaders parses the headers into Header field of the Envelope struct.
  104. // Data buffer must be full before calling.
  105. // It assumes that at most 30kb of email data can be a header
  106. // Decoding of encoding to UTF is only done on the Subject, where the result is assigned to the Subject field
  107. func (e *Envelope) ParseHeaders() error {
  108. var err error
  109. if e.Header != nil {
  110. return errors.New("headers already parsed")
  111. }
  112. buf := bytes.NewBuffer(e.Data.Bytes())
  113. // find where the header ends, assuming that over 30 kb would be max
  114. max := maxHeaderChunk
  115. if buf.Len() < max {
  116. max = buf.Len()
  117. }
  118. // read in the chunk which we'll scan for the header
  119. chunk := make([]byte, max)
  120. buf.Read(chunk)
  121. headerEnd := strings.Index(string(chunk), "\n\n") // the first two new-lines chars are the End Of Header
  122. if headerEnd > -1 {
  123. header := chunk[0:headerEnd]
  124. headerReader := textproto.NewReader(bufio.NewReader(bytes.NewBuffer(header)))
  125. e.Header, err = headerReader.ReadMIMEHeader()
  126. if err != nil {
  127. // decode the subject
  128. if subject, ok := e.Header["Subject"]; ok {
  129. e.Subject = MimeHeaderDecode(subject[0])
  130. }
  131. }
  132. } else {
  133. err = errors.New("header not found")
  134. }
  135. return err
  136. }
  137. // Len returns the number of bytes that would be in the reader returned by NewReader()
  138. func (e *Envelope) Len() int {
  139. return len(e.DeliveryHeader) + e.Data.Len()
  140. }
  141. // Returns a new reader for reading the email contents, including the delivery headers
  142. func (e *Envelope) NewReader() io.Reader {
  143. return io.MultiReader(
  144. strings.NewReader(e.DeliveryHeader),
  145. bytes.NewReader(e.Data.Bytes()),
  146. )
  147. }
  148. // String converts the email to string.
  149. // Typically, you would want to use the compressor guerrilla.Processor for more efficiency, or use NewReader
  150. func (e *Envelope) String() string {
  151. return e.DeliveryHeader + e.Data.String()
  152. }
  153. // ResetTransaction is called when the transaction is reset (keeping the connection open)
  154. func (e *Envelope) ResetTransaction() {
  155. // ensure not processing by the backend, will only get lock if finished, otherwise block
  156. e.Lock()
  157. // got the lock, it means processing finished
  158. e.Unlock()
  159. e.MailFrom = Address{}
  160. e.RcptTo = []Address{}
  161. // reset the data buffer, keep it allocated
  162. e.Data.Reset()
  163. // todo: these are probably good candidates for buffers / use sync.Pool (after profiling)
  164. e.Subject = ""
  165. e.Header = nil
  166. e.Hashes = make([]string, 0)
  167. e.DeliveryHeader = ""
  168. e.Values = make(map[string]interface{})
  169. }
  170. // Seed is called when used with a new connection, once it's accepted
  171. func (e *Envelope) Reseed(RemoteIP string, clientID uint64) {
  172. e.RemoteIP = RemoteIP
  173. e.QueuedId = queuedID(clientID)
  174. e.Helo = ""
  175. e.TLS = false
  176. }
  177. // PushRcpt adds a recipient email address to the envelope
  178. func (e *Envelope) PushRcpt(addr Address) {
  179. e.RcptTo = append(e.RcptTo, addr)
  180. }
  181. // Pop removes the last email address that was pushed to the envelope
  182. func (e *Envelope) PopRcpt() Address {
  183. ret := e.RcptTo[len(e.RcptTo)-1]
  184. e.RcptTo = e.RcptTo[:len(e.RcptTo)-1]
  185. return ret
  186. }
  187. // Converts 7 bit encoded mime header strings to UTF-8
  188. func MimeHeaderDecode(str string) string {
  189. state := 0
  190. var buf bytes.Buffer
  191. var out []byte
  192. for i := 0; i < len(str); i++ {
  193. switch state {
  194. case 0:
  195. if str[i] == '=' {
  196. buf.WriteByte(str[i])
  197. state = 1
  198. } else {
  199. out = append(out, str[i])
  200. }
  201. case 1:
  202. if str[i] == '?' {
  203. buf.WriteByte(str[i])
  204. state = 2
  205. } else {
  206. out = append(out, str[i])
  207. buf.Reset()
  208. state = 0
  209. }
  210. case 2:
  211. if str[i] == ' ' {
  212. d, err := Dec.Decode(buf.String())
  213. if err == nil {
  214. out = append(out, []byte(d)...)
  215. } else {
  216. out = append(out, buf.Bytes()...)
  217. }
  218. out = append(out, ' ')
  219. buf.Reset()
  220. state = 0
  221. } else {
  222. buf.WriteByte(str[i])
  223. }
  224. }
  225. }
  226. if buf.Len() > 0 {
  227. d, err := Dec.Decode(buf.String())
  228. if err == nil {
  229. out = append(out, []byte(d)...)
  230. } else {
  231. out = append(out, buf.Bytes()...)
  232. }
  233. }
  234. return string(out)
  235. }
  236. // Envelopes have their own pool
  237. type Pool struct {
  238. // envelopes that are ready to be borrowed
  239. pool chan *Envelope
  240. // semaphore to control number of maximum borrowed envelopes
  241. sem chan bool
  242. }
  243. func NewPool(poolSize int) *Pool {
  244. return &Pool{
  245. pool: make(chan *Envelope, poolSize),
  246. sem: make(chan bool, poolSize),
  247. }
  248. }
  249. func (p *Pool) Borrow(remoteAddr string, clientID uint64) *Envelope {
  250. var e *Envelope
  251. p.sem <- true // block the envelope until more room
  252. select {
  253. case e = <-p.pool:
  254. e.Reseed(remoteAddr, clientID)
  255. default:
  256. e = NewEnvelope(remoteAddr, clientID)
  257. }
  258. return e
  259. }
  260. // Return returns an envelope back to the envelope pool
  261. // Make sure that envelope finished processing before calling this
  262. func (p *Pool) Return(e *Envelope) {
  263. select {
  264. case p.pool <- e:
  265. //placed envelope back in pool
  266. default:
  267. // pool is full, discard it
  268. }
  269. // take a value off the semaphore to make room for more envelopes
  270. <-p.sem
  271. }