Move mailer to use a queue (#9789)

* Move mailer to use a queue

* Make sectionMap map[string]bool

* Ensure that Message is json encodable
pull/9774/head^2
zeripath 5 years ago committed by Antoine GIRARD
parent 06cd3e03a2
commit c76c70a16c
  1. 24
      modules/setting/queue.go
  2. 2
      services/mailer/mail.go
  3. 29
      services/mailer/mail_test.go
  4. 88
      services/mailer/mailer.go

@ -103,11 +103,11 @@ func NewQueueService() {
// Now handle the old issue_indexer configuration // Now handle the old issue_indexer configuration
section := Cfg.Section("queue.issue_indexer") section := Cfg.Section("queue.issue_indexer")
issueIndexerSectionMap := map[string]string{} sectionMap := map[string]bool{}
for _, key := range section.Keys() { for _, key := range section.Keys() {
issueIndexerSectionMap[key.Name()] = key.Value() sectionMap[key.Name()] = true
} }
if _, ok := issueIndexerSectionMap["TYPE"]; !ok { if _, ok := sectionMap["TYPE"]; !ok {
switch Indexer.IssueQueueType { switch Indexer.IssueQueueType {
case LevelQueueType: case LevelQueueType:
section.Key("TYPE").SetValue("level") section.Key("TYPE").SetValue("level")
@ -120,18 +120,28 @@ func NewQueueService() {
Indexer.IssueQueueType) Indexer.IssueQueueType)
} }
} }
if _, ok := issueIndexerSectionMap["LENGTH"]; !ok { if _, ok := sectionMap["LENGTH"]; !ok {
section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength)) section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength))
} }
if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok { if _, ok := sectionMap["BATCH_LENGTH"]; !ok {
section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
} }
if _, ok := issueIndexerSectionMap["DATADIR"]; !ok { if _, ok := sectionMap["DATADIR"]; !ok {
section.Key("DATADIR").SetValue(Indexer.IssueQueueDir) section.Key("DATADIR").SetValue(Indexer.IssueQueueDir)
} }
if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok { if _, ok := sectionMap["CONN_STR"]; !ok {
section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr) section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr)
} }
// Handle the old mailer configuration
section = Cfg.Section("queue.mailer")
sectionMap = map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
if _, ok := sectionMap["LENGTH"]; !ok {
section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
}
} }
// ParseQueueConnStr parses a queue connection string // ParseQueueConnStr parses a queue connection string

@ -51,7 +51,7 @@ func InitMailRender(subjectTpl *texttmpl.Template, bodyTpl *template.Template) {
// SendTestMail sends a test mail // SendTestMail sends a test mail
func SendTestMail(email string) error { func SendTestMail(email string) error {
return gomail.Send(Sender, NewMessage([]string{email}, "Gitea Test Email!", "Gitea Test Email!").Message) return gomail.Send(Sender, NewMessage([]string{email}, "Gitea Test Email!", "Gitea Test Email!").ToMessage())
} }
// SendUserMail sends a mail to the user // SendUserMail sends a mail to the user

@ -61,11 +61,11 @@ func TestComposeIssueCommentMessage(t *testing.T) {
msgs := composeIssueCommentMessages(&mailCommentContext{Issue: issue, Doer: doer, ActionType: models.ActionCommentIssue, msgs := composeIssueCommentMessages(&mailCommentContext{Issue: issue, Doer: doer, ActionType: models.ActionCommentIssue,
Content: "test body", Comment: comment}, tos, false, "issue comment") Content: "test body", Comment: comment}, tos, false, "issue comment")
assert.Len(t, msgs, 2) assert.Len(t, msgs, 2)
gomailMsg := msgs[0].ToMessage()
mailto := msgs[0].GetHeader("To") mailto := gomailMsg.GetHeader("To")
subject := msgs[0].GetHeader("Subject") subject := gomailMsg.GetHeader("Subject")
inreplyTo := msgs[0].GetHeader("In-Reply-To") inreplyTo := gomailMsg.GetHeader("In-Reply-To")
references := msgs[0].GetHeader("References") references := gomailMsg.GetHeader("References")
assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field") assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field")
assert.Equal(t, "Re: ", subject[0][:4], "Comment reply subject should contain Re:") assert.Equal(t, "Re: ", subject[0][:4], "Comment reply subject should contain Re:")
@ -96,14 +96,15 @@ func TestComposeIssueMessage(t *testing.T) {
Content: "test body"}, tos, false, "issue create") Content: "test body"}, tos, false, "issue create")
assert.Len(t, msgs, 2) assert.Len(t, msgs, 2)
mailto := msgs[0].GetHeader("To") gomailMsg := msgs[0].ToMessage()
subject := msgs[0].GetHeader("Subject") mailto := gomailMsg.GetHeader("To")
messageID := msgs[0].GetHeader("Message-ID") subject := gomailMsg.GetHeader("Subject")
messageID := gomailMsg.GetHeader("Message-ID")
assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field") assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field")
assert.Equal(t, "[user2/repo1] @user2 #1 - issue1", subject[0]) assert.Equal(t, "[user2/repo1] @user2 #1 - issue1", subject[0])
assert.Nil(t, msgs[0].GetHeader("In-Reply-To")) assert.Nil(t, gomailMsg.GetHeader("In-Reply-To"))
assert.Nil(t, msgs[0].GetHeader("References")) assert.Nil(t, gomailMsg.GetHeader("References"))
assert.Equal(t, messageID[0], "<user2/repo1/issues/1@localhost>", "Message-ID header doesn't match") assert.Equal(t, messageID[0], "<user2/repo1/issues/1@localhost>", "Message-ID header doesn't match")
} }
@ -134,9 +135,9 @@ func TestTemplateSelection(t *testing.T) {
InitMailRender(stpl, btpl) InitMailRender(stpl, btpl)
expect := func(t *testing.T, msg *Message, expSubject, expBody string) { expect := func(t *testing.T, msg *Message, expSubject, expBody string) {
subject := msg.GetHeader("Subject") subject := msg.ToMessage().GetHeader("Subject")
msgbuf := new(bytes.Buffer) msgbuf := new(bytes.Buffer)
_, _ = msg.WriteTo(msgbuf) _, _ = msg.ToMessage().WriteTo(msgbuf)
wholemsg := msgbuf.String() wholemsg := msgbuf.String()
assert.Equal(t, []string{expSubject}, subject) assert.Equal(t, []string{expSubject}, subject)
assert.Contains(t, wholemsg, expBody) assert.Contains(t, wholemsg, expBody)
@ -188,9 +189,9 @@ func TestTemplateServices(t *testing.T) {
msg := testComposeIssueCommentMessage(t, &mailCommentContext{Issue: issue, Doer: doer, ActionType: actionType, msg := testComposeIssueCommentMessage(t, &mailCommentContext{Issue: issue, Doer: doer, ActionType: actionType,
Content: "test body", Comment: comment}, tos, fromMention, "TestTemplateServices") Content: "test body", Comment: comment}, tos, fromMention, "TestTemplateServices")
subject := msg.GetHeader("Subject") subject := msg.ToMessage().GetHeader("Subject")
msgbuf := new(bytes.Buffer) msgbuf := new(bytes.Buffer)
_, _ = msg.WriteTo(msgbuf) _, _ = msg.ToMessage().WriteTo(msgbuf)
wholemsg := msgbuf.String() wholemsg := msgbuf.String()
assert.Equal(t, []string{expSubject}, subject) assert.Equal(t, []string{expSubject}, subject)

@ -18,7 +18,9 @@ import (
"time" "time"
"code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"github.com/jaytaylor/html2text" "github.com/jaytaylor/html2text"
@ -28,37 +30,62 @@ import (
// Message mail body and log info // Message mail body and log info
type Message struct { type Message struct {
Info string // Message information for log purpose. Info string // Message information for log purpose.
*gomail.Message FromAddress string
FromDisplayName string
To []string
Subject string
Date time.Time
Body string
Headers map[string][]string
} }
// NewMessageFrom creates new mail message object with custom From header. // ToMessage converts a Message to gomail.Message
func NewMessageFrom(to []string, fromDisplayName, fromAddress, subject, body string) *Message { func (m *Message) ToMessage() *gomail.Message {
log.Trace("NewMessageFrom (body):\n%s", body)
msg := gomail.NewMessage() msg := gomail.NewMessage()
msg.SetAddressHeader("From", fromAddress, fromDisplayName) msg.SetAddressHeader("From", m.FromAddress, m.FromDisplayName)
msg.SetHeader("To", to...) msg.SetHeader("To", m.To...)
for header := range m.Headers {
msg.SetHeader(header, m.Headers[header]...)
}
if len(setting.MailService.SubjectPrefix) > 0 { if len(setting.MailService.SubjectPrefix) > 0 {
msg.SetHeader("Subject", setting.MailService.SubjectPrefix+" "+subject) msg.SetHeader("Subject", setting.MailService.SubjectPrefix+" "+m.Subject)
} else { } else {
msg.SetHeader("Subject", subject) msg.SetHeader("Subject", m.Subject)
} }
msg.SetDateHeader("Date", time.Now()) msg.SetDateHeader("Date", m.Date)
msg.SetHeader("X-Auto-Response-Suppress", "All") msg.SetHeader("X-Auto-Response-Suppress", "All")
plainBody, err := html2text.FromString(body) plainBody, err := html2text.FromString(m.Body)
if err != nil || setting.MailService.SendAsPlainText { if err != nil || setting.MailService.SendAsPlainText {
if strings.Contains(base.TruncateString(body, 100), "<html>") { if strings.Contains(base.TruncateString(m.Body, 100), "<html>") {
log.Warn("Mail contains HTML but configured to send as plain text.") log.Warn("Mail contains HTML but configured to send as plain text.")
} }
msg.SetBody("text/plain", plainBody) msg.SetBody("text/plain", plainBody)
} else { } else {
msg.SetBody("text/plain", plainBody) msg.SetBody("text/plain", plainBody)
msg.AddAlternative("text/html", body) msg.AddAlternative("text/html", m.Body)
} }
return msg
}
// SetHeader adds additional headers to a message
func (m *Message) SetHeader(field string, value ...string) {
m.Headers[field] = value
}
// NewMessageFrom creates new mail message object with custom From header.
func NewMessageFrom(to []string, fromDisplayName, fromAddress, subject, body string) *Message {
log.Trace("NewMessageFrom (body):\n%s", body)
return &Message{ return &Message{
Message: msg, FromAddress: fromAddress,
FromDisplayName: fromDisplayName,
To: to,
Subject: subject,
Date: time.Now(),
Body: body,
Headers: map[string][]string{},
} }
} }
@ -257,18 +284,7 @@ func (s *dummySender) Send(from string, to []string, msg io.WriterTo) error {
return nil return nil
} }
func processMailQueue() { var mailQueue queue.Queue
for msg := range mailQueue {
log.Trace("New e-mail sending request %s: %s", msg.GetHeader("To"), msg.Info)
if err := gomail.Send(Sender, msg.Message); err != nil {
log.Error("Failed to send emails %s: %s - %v", msg.GetHeader("To"), msg.Info, err)
} else {
log.Trace("E-mails sent %s: %s", msg.GetHeader("To"), msg.Info)
}
}
}
var mailQueue chan *Message
// Sender sender for sending mail synchronously // Sender sender for sending mail synchronously
var Sender gomail.Sender var Sender gomail.Sender
@ -291,14 +307,26 @@ func NewContext() {
Sender = &dummySender{} Sender = &dummySender{}
} }
mailQueue = make(chan *Message, setting.MailService.QueueLength) mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) {
go processMailQueue() for _, datum := range data {
msg := datum.(*Message)
gomailMsg := msg.ToMessage()
log.Trace("New e-mail sending request %s: %s", gomailMsg.GetHeader("To"), msg.Info)
if err := gomail.Send(Sender, gomailMsg); err != nil {
log.Error("Failed to send emails %s: %s - %v", gomailMsg.GetHeader("To"), msg.Info, err)
} else {
log.Trace("E-mails sent %s: %s", gomailMsg.GetHeader("To"), msg.Info)
}
}
}, &Message{})
go graceful.GetManager().RunWithShutdownFns(mailQueue.Run)
} }
// SendAsync send mail asynchronously // SendAsync send mail asynchronously
func SendAsync(msg *Message) { func SendAsync(msg *Message) {
go func() { go func() {
mailQueue <- msg _ = mailQueue.Push(msg)
}() }()
} }
@ -306,7 +334,7 @@ func SendAsync(msg *Message) {
func SendAsyncs(msgs []*Message) { func SendAsyncs(msgs []*Message) {
go func() { go func() {
for _, msg := range msgs { for _, msg := range msgs {
mailQueue <- msg _ = mailQueue.Push(msg)
} }
}() }()
} }

Loading…
Cancel
Save