| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 | 
							- package backends
 
- import (
 
- 	"errors"
 
- 	"fmt"
 
- 	log "github.com/Sirupsen/logrus"
 
- 	"github.com/flashmob/go-guerrilla/envelope"
 
- 	"strconv"
 
- 	"strings"
 
- 	"sync"
 
- 	"time"
 
- )
 
- // Backends process received mail. Depending on the implementation, they can store mail in the database,
 
- // write to a file, check for spam, re-transmit to another server, etc.
 
- // Must return an SMTP message (i.e. "250 OK") and a boolean indicating
 
- // whether the message was processed successfully.
 
- type Backend interface {
 
- 	// Public methods
 
- 	Process(*envelope.Envelope) BackendResult
 
- 	Initialize(BackendConfig) error
 
- 	Shutdown() error
 
- 	// start save mail worker(s)
 
- 	saveMailWorker(chan *savePayload)
 
- 	// get the number of workers that will be stared
 
- 	getNumberOfWorkers() int
 
- 	// test database settings, permissions, correct paths, etc, before starting workers
 
- 	testSettings() error
 
- 	// parse the configuration files
 
- 	loadConfig(BackendConfig) error
 
- }
 
- type configLoader interface {
 
- 	loadConfig(backendConfig BackendConfig) (err error)
 
- }
 
- type BackendConfig map[string]interface{}
 
- var backends = map[string]Backend{}
 
- type baseConfig interface{}
 
- type saveStatus struct {
 
- 	err  error
 
- 	hash string
 
- }
 
- type savePayload struct {
 
- 	mail        *envelope.Envelope
 
- 	from        *envelope.EmailAddress
 
- 	recipient   *envelope.EmailAddress
 
- 	savedNotify chan *saveStatus
 
- }
 
- type helper struct {
 
- 	saveMailChan chan *savePayload
 
- 	wg           sync.WaitGroup
 
- }
 
- // BackendResult represents a response to an SMTP client after receiving DATA.
 
- // The String method should return an SMTP message ready to send back to the
 
- // client, for example `250 OK: Message received`.
 
- type BackendResult interface {
 
- 	fmt.Stringer
 
- 	// Code should return the SMTP code associated with this response, ie. `250`
 
- 	Code() int
 
- }
 
- // Internal implementation of BackendResult for use by backend implementations.
 
- type backendResult string
 
- func (br backendResult) String() string {
 
- 	return string(br)
 
- }
 
- // Parses the SMTP code from the first 3 characters of the SMTP message.
 
- // Returns 554 if code cannot be parsed.
 
- func (br backendResult) Code() int {
 
- 	trimmed := strings.TrimSpace(string(br))
 
- 	if len(trimmed) < 3 {
 
- 		return 554
 
- 	}
 
- 	code, err := strconv.Atoi(trimmed[:3])
 
- 	if err != nil {
 
- 		return 554
 
- 	}
 
- 	return code
 
- }
 
- func NewBackendResult(message string) BackendResult {
 
- 	return backendResult(message)
 
- }
 
- // A backend gateway is a proxy that implements the Backend interface.
 
- // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
 
- // via a channel. Shutting down via Shutdown() will stop all workers.
 
- // The rest of this program always talks to the backend via this gateway.
 
- type BackendGateway struct {
 
- 	AbstractBackend
 
- 	saveMailChan chan *savePayload
 
- 	// waits for backend workers to start/stop
 
- 	wg sync.WaitGroup
 
- 	b  Backend
 
- 	// controls access to state
 
- 	stateGuard sync.Mutex
 
- 	state      int
 
- }
 
- // possible values for state
 
- const (
 
- 	BackendStateProcessing = iota
 
- 	BackendStateShutdown
 
- )
 
- // New retrieve a backend specified by the backendName, and initialize it using
 
- // backendConfig
 
- func New(backendName string, backendConfig BackendConfig) (Backend, error) {
 
- 	backend, found := backends[backendName]
 
- 	if !found {
 
- 		return nil, fmt.Errorf("backend %q not found", backendName)
 
- 	}
 
- 	p := &BackendGateway{b: backend}
 
- 	err := p.Initialize(backendConfig)
 
- 	if err != nil {
 
- 		return nil, fmt.Errorf("error while initializing the backend: %s", err)
 
- 	}
 
- 	p.state = BackendStateProcessing
 
- 	return p, nil
 
- }
 
- // Distributes an envelope to one of the backend workers
 
- func (gw *BackendGateway) Process(e *envelope.Envelope) BackendResult {
 
- 	to := e.RcptTo
 
- 	from := e.MailFrom
 
- 	// place on the channel so that one of the save mail workers can pick it up
 
- 	// TODO: support multiple recipients
 
- 	savedNotify := make(chan *saveStatus)
 
- 	gw.saveMailChan <- &savePayload{e, from, &to[0], savedNotify}
 
- 	// wait for the save to complete
 
- 	// or timeout
 
- 	select {
 
- 	case status := <-savedNotify:
 
- 		if status.err != nil {
 
- 			return NewBackendResult("554 Error: " + status.err.Error())
 
- 		}
 
- 		return NewBackendResult(fmt.Sprintf("250 OK : queued as %s", status.hash))
 
- 	case <-time.After(time.Second * 30):
 
- 		log.Infof("Backend has timed out")
 
- 		return NewBackendResult("554 Error: transaction timeout")
 
- 	}
 
- }
 
- func (gw *BackendGateway) Shutdown() error {
 
- 	gw.stateGuard.Lock()
 
- 	defer gw.stateGuard.Unlock()
 
- 	if gw.state != BackendStateShutdown {
 
- 		err := gw.b.Shutdown()
 
- 		if err == nil {
 
- 			close(gw.saveMailChan) // workers will stop
 
- 			gw.wg.Wait()
 
- 			gw.state = BackendStateShutdown
 
- 		}
 
- 		return err
 
- 	}
 
- 	return nil
 
- }
 
- func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 
- 	err := gw.b.Initialize(cfg)
 
- 	if err == nil {
 
- 		workersSize := gw.b.getNumberOfWorkers()
 
- 		if workersSize < 1 {
 
- 			return errors.New("Must have at least 1 worker")
 
- 		}
 
- 		if err := gw.b.testSettings(); err != nil {
 
- 			return err
 
- 		}
 
- 		gw.saveMailChan = make(chan *savePayload, workersSize)
 
- 		// start our savemail workers
 
- 		gw.wg.Add(workersSize)
 
- 		for i := 0; i < workersSize; i++ {
 
- 			go func() {
 
- 				gw.b.saveMailWorker(gw.saveMailChan)
 
- 				gw.wg.Done()
 
- 			}()
 
- 		}
 
- 	}
 
- 	return err
 
- }
 
 
  |