PMAPI target - parallel upload

This commit is contained in:
Michal Horejsek 2020-09-16 15:03:40 +02:00
parent 9b5da91f7c
commit 7337f78d4a
2 changed files with 32 additions and 11 deletions

View File

@ -14,6 +14,7 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/)
* Unsilent errors reading mbox files.
* GODT-692 QA build with option to change API URL by ENV variable.
* GODT-704 User agent detected by fake IMAP extension instead of AUTH callback (some clients use LOGIN instead of AUTH).
* GODT-695 Parallel upload for ProtonMail target.
### Removed
* GODT-519 Unused AUTH scope parsing methods.

View File

@ -22,6 +22,7 @@ import (
"fmt"
"io"
"io/ioutil"
"sync"
pkgMessage "github.com/ProtonMail/proton-bridge/pkg/message"
"github.com/ProtonMail/proton-bridge/pkg/pmapi"
@ -32,6 +33,7 @@ import (
const (
pmapiImportBatchMaxItems = 10
pmapiImportBatchMaxSize = 25 * 1000 * 1000 // 25 MB
pmapiImportWorkers = 5
)
// DefaultMailboxes returns the default mailboxes for default rules if no other is found.
@ -80,6 +82,9 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch
p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{}
p.importMsgReqSize = 0
importMsgReqMapCh := make(chan map[string]*pmapi.ImportMsgReq)
wg := p.runImporting(progress, importMsgReqMapCh)
for msg := range ch {
if progress.shouldStop() {
break
@ -88,13 +93,15 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch
if p.isMessageDraft(msg) {
p.transferDraft(rules, progress, msg)
} else {
p.transferMessage(rules, progress, msg)
p.transferMessage(rules, progress, msg, importMsgReqMapCh)
}
}
if len(p.importMsgReqMap) > 0 {
p.importMessages(progress)
importMsgReqMapCh <- p.importMsgReqMap
}
close(importMsgReqMapCh)
wg.Wait()
}
func (p *PMAPIProvider) isMessageDraft(msg Message) bool {
@ -163,7 +170,7 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string
return draft.ID, nil
}
func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, msg Message) {
func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, msg Message, importMsgReqMapCh chan map[string]*pmapi.ImportMsgReq) {
importMsgReq, err := p.generateImportMsgReq(msg, rules.globalMailbox)
if err != nil {
progress.messageImported(msg.ID, "", err)
@ -172,7 +179,9 @@ func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress,
importMsgReqSize := len(importMsgReq.Body)
if p.importMsgReqSize+importMsgReqSize > pmapiImportBatchMaxSize || len(p.importMsgReqMap) == pmapiImportBatchMaxItems {
p.importMessages(progress)
importMsgReqMapCh <- p.importMsgReqMap
p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{}
p.importMsgReqSize = 0
}
p.importMsgReqMap[msg.ID] = importMsgReq
p.importMsgReqSize += importMsgReqSize
@ -267,24 +276,38 @@ func computeMessageFlags(labels []string) (flag int64) {
return flag
}
func (p *PMAPIProvider) importMessages(progress *Progress) {
func (p *PMAPIProvider) runImporting(progress *Progress, importMsgReqMapCh chan map[string]*pmapi.ImportMsgReq) *sync.WaitGroup {
var wg sync.WaitGroup
for i := 0; i < pmapiImportWorkers; i++ {
wg.Add(1)
go func() {
for importMsgReqMap := range importMsgReqMapCh {
p.importMessages(progress, importMsgReqMap)
}
wg.Done()
}()
}
return &wg
}
func (p *PMAPIProvider) importMessages(progress *Progress, importMsgReqMap map[string]*pmapi.ImportMsgReq) {
if progress.shouldStop() {
return
}
importMsgIDs := []string{}
importMsgRequests := []*pmapi.ImportMsgReq{}
for msgID, req := range p.importMsgReqMap {
for msgID, req := range importMsgReqMap {
importMsgIDs = append(importMsgIDs, msgID)
importMsgRequests = append(importMsgRequests, req)
}
log.WithField("msgIDs", importMsgIDs).WithField("size", p.importMsgReqSize).Debug("Importing messages")
log.WithField("msgIDs", importMsgIDs).Trace("Importing messages")
results, err := p.importRequest(importMsgIDs[0], importMsgRequests)
// In case the whole request failed, try to import every message one by one.
if err != nil || len(results) == 0 {
log.WithError(err).Warning("Importing messages failed, trying one by one")
for msgID, req := range p.importMsgReqMap {
for msgID, req := range importMsgReqMap {
importedID, err := p.importMessage(msgID, progress, req)
progress.messageImported(msgID, importedID, err)
}
@ -303,9 +326,6 @@ func (p *PMAPIProvider) importMessages(progress *Progress) {
progress.messageImported(msgID, result.MessageID, nil)
}
}
p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{}
p.importMsgReqSize = 0
}
func (p *PMAPIProvider) importMessage(msgSourceID string, progress *Progress, req *pmapi.ImportMsgReq) (importedID string, importedErr error) {