Fix transfer stopping

This commit is contained in:
Michal Horejsek 2020-10-20 10:37:41 +02:00
parent 51b6f95342
commit 719d369c2a
5 changed files with 45 additions and 3 deletions

View File

@ -10,6 +10,10 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/)
* GODT-662 Do not resume paused transfer progress after dismissing cancel popup.
* GODT-772 Sanitize mailbox names for exporting to follow OS restrictions.
* GODT-771 Show fatal errors after export is terminated.
* GODT-779 Do not propagate updates when progress is stopped.
* GODT-779 Unpause progress during fatal error to properly stop progress.
* GODT-779 Stop ongoing transfer calls sooner (re-check after import request is generated).
* Fix measurement of uploading attachments during transfer.
### Changed
* Bump crypto version to v0.0.0-20200818122824-ed5d25e28db8

View File

@ -93,7 +93,7 @@ func (p *Progress) fatal(err error) {
defer p.lock.Unlock()
log.WithError(err).Error("Progress finished")
p.isStopped = true
p.setStop()
p.fatalError = err
p.cleanUpdateCh()
}
@ -282,6 +282,15 @@ func (p *Progress) Stop() {
defer p.update()
p.log.Info("Progress stopped")
p.setStop()
// Once progress is stopped, some calls might be in progress. Results from
// those calls are irrelevant so we can close update channel sooner to not
// propagate any progress to user interface anymore.
p.cleanUpdateCh()
}
func (p *Progress) setStop() {
p.isStopped = true
p.pauseReason = "" // Clear pause to run paused code and stop it.
}

View File

@ -19,6 +19,7 @@ package transfer
import (
"testing"
"time"
"github.com/pkg/errors"
a "github.com/stretchr/testify/assert"
@ -104,6 +105,28 @@ func TestProgressFatalError(t *testing.T) {
r.NotPanics(t, func() { progress.addMessage("msg", nil) })
}
func TestFailUnpauseAndStops(t *testing.T) {
progress := newProgress(log, nil)
drainProgressUpdateChannel(&progress)
progress.Pause("pausing")
progress.fatal(errors.New("fatal error"))
r.Nil(t, progress.updateCh)
r.True(t, progress.isStopped)
r.False(t, progress.IsPaused())
r.Eventually(t, progress.shouldStop, 2*time.Millisecond, time.Millisecond)
}
func TestStopClosesUpdates(t *testing.T) {
progress := newProgress(log, nil)
ch := progress.updateCh
progress.Stop()
r.Nil(t, progress.updateCh)
r.PanicsWithError(t, "send on closed channel", func() { ch <- struct{}{} })
}
func drainProgressUpdateChannel(progress *Progress) {
// updateCh is not needed to drain under tests - timeout is implemented.
// But timeout takes time which would slow down tests.

View File

@ -177,6 +177,10 @@ func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress,
return
}
if progress.shouldStop() {
return
}
importMsgReqSize := len(importMsgReq.Body)
if p.nextImportRequestsSize+importMsgReqSize > pmapiImportBatchMaxSize || len(p.nextImportRequests) == pmapiImportBatchMaxItems {
preparedImportRequestsCh <- p.nextImportRequests

View File

@ -118,8 +118,10 @@ func (p *PMAPIProvider) createDraft(msgSourceID string, message *pmapi.Message,
func (p *PMAPIProvider) createAttachment(msgSourceID string, att *pmapi.Attachment, r io.Reader, sig io.Reader) (created *pmapi.Attachment, err error) {
err = p.ensureConnection(func() error {
p.timeIt.start("upload", msgSourceID)
defer p.timeIt.stop("upload", msgSourceID)
// Use some attributes from attachment to have unique key for each call.
key := fmt.Sprintf("%s_%s_%d", msgSourceID, att.Name, att.Size)
p.timeIt.start("upload", key)
defer p.timeIt.stop("upload", key)
created, err = p.client().CreateAttachment(att, r, sig)
return err