1
0

worker.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package backends
  2. import (
  3. "errors"
  4. "runtime/debug"
  5. )
  6. type Worker struct{}
  7. func (w *Worker) workDispatcher(workIn chan *workerMsg, validateRcpt chan *workerMsg, p Processor, workerId int) {
  8. defer func() {
  9. if r := recover(); r != nil {
  10. // recover form closed channel
  11. Log().Error("Recovered form panic:", r, string(debug.Stack()))
  12. }
  13. // close any connections / files
  14. Svc.shutdown()
  15. }()
  16. Log().Infof("Save mail worker started (#%d)", workerId)
  17. for {
  18. select {
  19. case msg := <-workIn:
  20. if msg == nil {
  21. Log().Debug("No more messages from saveMail")
  22. return
  23. }
  24. // process the email here
  25. // TODO we should check the err
  26. result, _ := p.Process(msg.mail, TaskSaveMail)
  27. if result.Code() < 300 {
  28. // if all good, let the gateway know that it was queued
  29. msg.notifyMe <- &notifyMsg{nil, msg.mail.QueuedId}
  30. } else {
  31. // notify the gateway about the error
  32. msg.notifyMe <- &notifyMsg{err: errors.New(result.String())}
  33. }
  34. case msg := <-validateRcpt:
  35. _, err := p.Process(msg.mail, TaskValidateRcpt)
  36. if err != nil {
  37. // validation failed
  38. msg.notifyMe <- &notifyMsg{err: err}
  39. } else {
  40. // all good.
  41. msg.notifyMe <- &notifyMsg{err: nil}
  42. }
  43. }
  44. }
  45. }