I/E measurements

This commit is contained in:
Michal Horejsek 2020-09-11 15:09:37 +02:00
parent 41ac61bbe8
commit f3773c9d78
10 changed files with 165 additions and 18 deletions

View File

@ -7,6 +7,7 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/)
### Added
* GODT-682 Persistent anonymous API cookies for Import-Export.
* GODT-357 Use go-message to make a better message parser.
* GODT-720 Time measurement of progress for Import-Export.
### Changed
* GODT-511 User agent format changed.

View File

@ -62,11 +62,20 @@ func (p *Progress) update() {
return
}
// In case no one listens for an update, do not block the progress.
select {
case p.updateCh <- struct{}{}:
case <-time.After(100 * time.Millisecond):
}
// In case no one listens for an update, do not block the whole progress.
go func() {
defer func() {
// updateCh can be closed at the end of progress which is fine.
if r := recover(); r != nil {
log.WithField("r", r).Warn("Failed to send update")
}
}()
select {
case p.updateCh <- struct{}{}:
case <-time.After(5 * time.Millisecond):
}
}()
}
// finish should be called as the last call once everything is done.

View File

@ -31,6 +31,8 @@ type IMAPProvider struct {
addr string
client *imapClient.Client
timeIt *timeIt
}
// NewIMAPProvider returns new IMAPProvider.
@ -39,6 +41,8 @@ func NewIMAPProvider(username, password, host, port string) (*IMAPProvider, erro
username: username,
password: password,
addr: net.JoinHostPort(host, port),
timeIt: newTimeIt("imap"),
}
if err := p.auth(); err != nil {

View File

@ -40,6 +40,9 @@ func (p *IMAPProvider) TransferTo(rules transferRules, progress *Progress, ch ch
log.Info("Started transfer from IMAP to channel")
defer log.Info("Finished transfer from IMAP to channel")
p.timeIt.clear()
defer p.timeIt.logResults()
imapMessageInfoMap := p.loadMessageInfoMap(rules, progress)
for rule := range rules.iterateActiveRules() {
@ -78,6 +81,9 @@ func (p *IMAPProvider) loadMessageInfoMap(rules transferRules, progress *Progres
}
func (p *IMAPProvider) loadMessagesInfo(rule *Rule, progress *Progress, uidValidity, count uint32) map[string]imapMessageInfo {
p.timeIt.start("load", rule.SourceMailbox.Name)
defer p.timeIt.stop("load", rule.SourceMailbox.Name)
messagesInfo := map[string]imapMessageInfo{}
pageStart := uint32(1)
@ -199,13 +205,18 @@ func (p *IMAPProvider) exportMessages(rule *Rule, progress *Progress, ch chan<-
progress.messageExported(id, body, err)
if err == nil {
msg := p.exportMessage(rule, id, imapMessage, body)
p.timeIt.stop("fetch", rule.SourceMailbox.Name)
ch <- msg
p.timeIt.start("fetch", rule.SourceMailbox.Name)
}
}
p.timeIt.start("fetch", rule.SourceMailbox.Name)
progress.callWrap(func() error {
return p.uidFetch(rule.SourceMailbox.Name, seqSet, items, processMessageCallback)
})
p.timeIt.stop("fetch", rule.SourceMailbox.Name)
}
func (p *IMAPProvider) exportMessage(rule *Rule, id string, imapMessage *imap.Message, body []byte) Message {

View File

@ -35,6 +35,8 @@ type PMAPIProvider struct {
importMsgReqMap map[string]*pmapi.ImportMsgReq // Key is msg transfer ID.
importMsgReqSize int
timeIt *timeIt
}
// NewPMAPIProvider returns new PMAPIProvider.
@ -47,6 +49,8 @@ func NewPMAPIProvider(config *pmapi.ClientConfig, clientManager ClientManager, u
importMsgReqMap: map[string]*pmapi.ImportMsgReq{},
importMsgReqSize: 0,
timeIt: newTimeIt("pmapi"),
}
if addressID != "" {

View File

@ -34,6 +34,9 @@ func (p *PMAPIProvider) TransferTo(rules transferRules, progress *Progress, ch c
log.Info("Started transfer from PMAPI to channel")
defer log.Info("Finished transfer from PMAPI to channel")
p.timeIt.clear()
defer p.timeIt.logResults()
// TransferTo cannot end sooner than loadCounts goroutine because
// loadCounts writes to channel in progress which would be closed.
// That can happen for really small accounts.
@ -147,6 +150,9 @@ func (p *PMAPIProvider) exportMessage(rule *Rule, progress *Progress, pmapiMsgID
return err
})
p.timeIt.start("build", msgID)
defer p.timeIt.stop("build", msgID)
msgBuilder := pkgMessage.NewBuilder(p.client(), msg)
msgBuilder.EncryptedToHTML = false
_, body, err := msgBuilder.BuildMessage()

View File

@ -72,6 +72,9 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch
log.Info("Started transfer from channel to PMAPI")
defer log.Info("Finished transfer from channel to PMAPI")
p.timeIt.clear()
defer p.timeIt.logResults()
// Cache has to be cleared before each transfer to not contain
// old stuff from previous cancelled run.
p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{}
@ -114,7 +117,10 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string
return "", errors.Wrap(err, "failed to parse message")
}
if err := message.Encrypt(p.keyRing, nil); err != nil {
p.timeIt.start("encrypt", msg.ID)
err = message.Encrypt(p.keyRing, nil)
p.timeIt.stop("encrypt", msg.ID)
if err != nil {
return "", errors.Wrap(err, "failed to encrypt draft")
}
@ -125,7 +131,7 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string
attachments := message.Attachments
message.Attachments = nil
draft, err := p.createDraft(message, "", pmapi.DraftActionReply)
draft, err := p.createDraft(msg.ID, message, "", pmapi.DraftActionReply)
if err != nil {
return "", errors.Wrap(err, "failed to create draft")
}
@ -140,13 +146,15 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string
return "", errors.Wrap(err, "failed to sign attachment")
}
p.timeIt.start("encrypt", msg.ID)
r = bytes.NewReader(attachmentBody)
encReader, err := attachment.Encrypt(p.keyRing, r)
p.timeIt.stop("encrypt", msg.ID)
if err != nil {
return "", errors.Wrap(err, "failed to encrypt attachment")
}
_, err = p.createAttachment(attachment, encReader, sigReader)
_, err = p.createAttachment(msg.ID, attachment, encReader, sigReader)
if err != nil {
return "", errors.Wrap(err, "failed to create attachment")
}
@ -176,7 +184,9 @@ func (p *PMAPIProvider) generateImportMsgReq(msg Message, globalMailbox *Mailbox
return nil, errors.Wrap(err, "failed to parse message")
}
p.timeIt.start("encrypt", msg.ID)
body, err := p.encryptMessage(message, attachmentReaders)
p.timeIt.stop("encrypt", msg.ID)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
@ -208,6 +218,9 @@ func (p *PMAPIProvider) generateImportMsgReq(msg Message, globalMailbox *Mailbox
}
func (p *PMAPIProvider) parseMessage(msg Message) (m *pmapi.Message, r []io.Reader, err error) {
p.timeIt.start("parse", msg.ID)
defer p.timeIt.stop("parse", msg.ID)
// Old message parser is panicking in some cases.
// Instead of crashing we try to convert to regular error.
defer func() {
@ -265,15 +278,14 @@ func (p *PMAPIProvider) importMessages(progress *Progress) {
importMsgIDs = append(importMsgIDs, msgID)
importMsgRequests = append(importMsgRequests, req)
}
log.WithField("msgIDs", importMsgIDs).WithField("size", p.importMsgReqSize).Debug("Importing messages")
results, err := p.importRequest(importMsgRequests)
results, err := p.importRequest(importMsgIDs[0], importMsgRequests)
// In case the whole request failed, try to import every message one by one.
if err != nil || len(results) == 0 {
log.WithError(err).Warning("Importing messages failed, trying one by one")
for msgID, req := range p.importMsgReqMap {
importedID, err := p.importMessage(progress, req)
importedID, err := p.importMessage(msgID, progress, req)
progress.messageImported(msgID, importedID, err)
}
return
@ -285,7 +297,7 @@ func (p *PMAPIProvider) importMessages(progress *Progress) {
if result.Error != nil {
log.WithError(result.Error).WithField("msg", msgID).Warning("Importing message failed, trying alone")
req := importMsgRequests[index]
importedID, err := p.importMessage(progress, req)
importedID, err := p.importMessage(msgID, progress, req)
progress.messageImported(msgID, importedID, err)
} else {
progress.messageImported(msgID, result.MessageID, nil)
@ -296,9 +308,9 @@ func (p *PMAPIProvider) importMessages(progress *Progress) {
p.importMsgReqSize = 0
}
func (p *PMAPIProvider) importMessage(progress *Progress, req *pmapi.ImportMsgReq) (importedID string, importedErr error) {
func (p *PMAPIProvider) importMessage(msgSourceID string, progress *Progress, req *pmapi.ImportMsgReq) (importedID string, importedErr error) {
progress.callWrap(func() error {
results, err := p.importRequest([]*pmapi.ImportMsgReq{req})
results, err := p.importRequest(msgSourceID, []*pmapi.ImportMsgReq{req})
if err != nil {
return errors.Wrap(err, "failed to import messages")
}

View File

@ -18,6 +18,7 @@
package transfer
import (
"fmt"
"io"
"time"
@ -71,6 +72,10 @@ func (p *PMAPIProvider) tryReconnect() error {
func (p *PMAPIProvider) listMessages(filter *pmapi.MessagesFilter) (messages []*pmapi.Message, count int, err error) {
err = p.ensureConnection(func() error {
key := fmt.Sprintf("%s_%d", filter.LabelID, filter.Page)
p.timeIt.start("listing", key)
defer p.timeIt.stop("listing", key)
messages, count, err = p.client().ListMessages(filter)
return err
})
@ -79,30 +84,42 @@ func (p *PMAPIProvider) listMessages(filter *pmapi.MessagesFilter) (messages []*
func (p *PMAPIProvider) getMessage(msgID string) (message *pmapi.Message, err error) {
err = p.ensureConnection(func() error {
p.timeIt.start("download", msgID)
defer p.timeIt.stop("download", msgID)
message, err = p.client().GetMessage(msgID)
return err
})
return
}
func (p *PMAPIProvider) importRequest(req []*pmapi.ImportMsgReq) (res []*pmapi.ImportMsgRes, err error) {
func (p *PMAPIProvider) importRequest(msgSourceID string, req []*pmapi.ImportMsgReq) (res []*pmapi.ImportMsgRes, err error) {
err = p.ensureConnection(func() error {
p.timeIt.start("upload", msgSourceID)
defer p.timeIt.stop("upload", msgSourceID)
res, err = p.client().Import(req)
return err
})
return
}
func (p *PMAPIProvider) createDraft(message *pmapi.Message, parent string, action int) (draft *pmapi.Message, err error) {
func (p *PMAPIProvider) createDraft(msgSourceID string, message *pmapi.Message, parent string, action int) (draft *pmapi.Message, err error) {
err = p.ensureConnection(func() error {
p.timeIt.start("upload", msgSourceID)
defer p.timeIt.stop("upload", msgSourceID)
draft, err = p.client().CreateDraft(message, parent, action)
return err
})
return
}
func (p *PMAPIProvider) createAttachment(att *pmapi.Attachment, r io.Reader, sig io.Reader) (created *pmapi.Attachment, err error) {
func (p *PMAPIProvider) createAttachment(msgSourceID string, att *pmapi.Attachment, r io.Reader, sig io.Reader) (created *pmapi.Attachment, err error) {
err = p.ensureConnection(func() error {
p.timeIt.start("upload", msgSourceID)
defer p.timeIt.stop("upload", msgSourceID)
created, err = p.client().CreateAttachment(att, r, sig)
return err
})

View File

@ -0,0 +1,80 @@
// Copyright (c) 2020 Proton Technologies AG
//
// This file is part of ProtonMail Bridge.
//
// ProtonMail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// ProtonMail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with ProtonMail Bridge. If not, see <https://www.gnu.org/licenses/>.
package transfer
import (
"sync"
"time"
)
type timeIt struct {
lock sync.Locker
name string
groups map[string]int64
ongoing map[string]time.Time
}
func newTimeIt(name string) *timeIt {
return &timeIt{
lock: &sync.Mutex{},
name: name,
groups: map[string]int64{},
ongoing: map[string]time.Time{},
}
}
func (t *timeIt) clear() {
t.lock.Lock()
defer t.lock.Unlock()
t.groups = map[string]int64{}
t.ongoing = map[string]time.Time{}
}
func (t *timeIt) start(group, id string) {
t.lock.Lock()
defer t.lock.Unlock()
t.ongoing[group+"/"+id] = time.Now()
}
func (t *timeIt) stop(group, id string) {
endTime := time.Now()
t.lock.Lock()
defer t.lock.Unlock()
startTime, ok := t.ongoing[group+"/"+id]
if !ok {
log.WithField("group", group).WithField("id", id).Error("Stop called before start")
return
}
delete(t.ongoing, group+"/"+id)
diff := endTime.Sub(startTime).Milliseconds()
t.groups[group] += diff
}
func (t *timeIt) logResults() {
t.lock.Lock()
defer t.lock.Unlock()
// Print also ongoing to be sure that nothing was left out.
// Basically ongoing should be empty.
log.WithField("name", t.name).WithField("result", t.groups).WithField("ongoing", t.ongoing).Debug("Time measurement")
}

View File

@ -181,7 +181,10 @@ func (t *Transfer) Start() *Progress {
reportFile := newFileReport(t.logDir, t.id)
progress := newProgress(log, reportFile)
ch := make(chan Message)
// Small queue to prevent having idle source while target is blocked.
// E.g., when upload to PM is in progress, we can in meantime download
// the next batch from remote IMAP server.
ch := make(chan Message, 10)
go func() {
defer t.panicHandler.HandlePanic()