| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- package backends
- import (
- "errors"
- "runtime/debug"
- )
- type Worker struct{}
- func (w *Worker) workDispatcher(workIn chan *workerMsg, validateRcpt chan *workerMsg, p Processor, workerId int) {
- defer func() {
- if r := recover(); r != nil {
- // recover form closed channel
- Log().Error("Recovered form panic:", r, string(debug.Stack()))
- }
- // close any connections / files
- Svc.shutdown()
- }()
- Log().Infof("Save mail worker started (#%d)", workerId)
- for {
- select {
- case msg := <-workIn:
- if msg == nil {
- Log().Debug("No more messages from saveMail")
- return
- }
- // process the email here
- // TODO we should check the err
- result, _ := p.Process(msg.mail, TaskSaveMail)
- if result.Code() < 300 {
- // if all good, let the gateway know that it was queued
- msg.notifyMe <- ¬ifyMsg{nil, msg.mail.QueuedId}
- } else {
- // notify the gateway about the error
- msg.notifyMe <- ¬ifyMsg{err: errors.New(result.String())}
- }
- case msg := <-validateRcpt:
- _, err := p.Process(msg.mail, TaskValidateRcpt)
- if err != nil {
- // validation failed
- msg.notifyMe <- ¬ifyMsg{err: err}
- } else {
- // all good.
- msg.notifyMe <- ¬ifyMsg{err: nil}
- }
- }
- }
- }
|