GODT-2202: Report update errors from Gluon

For every update sent to gluon wait and check the error code to see if
an error occurred.

Note: Updates can't be inspect on the call site as it can lead to
deadlocks.
This commit is contained in:
Leander Beernaert 2023-01-13 15:24:09 +01:00
parent 931ed119bb
commit 93c7552a41
7 changed files with 169 additions and 91 deletions

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.18
require (
github.com/0xAX/notificator v0.0.0-20220220101646-ee9b8921e557
github.com/Masterminds/semver/v3 v3.1.1
github.com/ProtonMail/gluon v0.14.2-0.20230112101229-07a5a074643e
github.com/ProtonMail/gluon v0.14.2-0.20230113145313-7dc070e73340
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a
github.com/ProtonMail/go-proton-api v0.2.4-0.20230112102613-6ad201cdb337
github.com/ProtonMail/go-rfc5322 v0.11.0

4
go.sum
View File

@ -28,8 +28,8 @@ github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf h1:yc9daCCYUefEs
github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf/go.mod h1:o0ESU9p83twszAU8LBeJKFAAMX14tISa0yk4Oo5TOqo=
github.com/ProtonMail/docker-credential-helpers v1.1.0 h1:+kvUIpwWcbtP3WFv5sSvkFn/XLzSqPOB5AAthuk9xPk=
github.com/ProtonMail/docker-credential-helpers v1.1.0/go.mod h1:mK0aBveCxhnQ756AmaTfXMZDeULvheYVhF/MWMErN5g=
github.com/ProtonMail/gluon v0.14.2-0.20230112101229-07a5a074643e h1:3UfVqUoBAh8R+FO+F+l0XkXerO6v0lD8waMFfwkdP/c=
github.com/ProtonMail/gluon v0.14.2-0.20230112101229-07a5a074643e/go.mod h1:z2AxLIiBCT1K+0OBHyaDI7AEaO5qI6/BEC2TE42vs4Q=
github.com/ProtonMail/gluon v0.14.2-0.20230113145313-7dc070e73340 h1:NrE0XbpppwSPRDbhK0LMoyIkE/+89Nj83MF9jg/f0X8=
github.com/ProtonMail/gluon v0.14.2-0.20230113145313-7dc070e73340/go.mod h1:z2AxLIiBCT1K+0OBHyaDI7AEaO5qI6/BEC2TE42vs4Q=
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a h1:D+aZah+k14Gn6kmL7eKxoo/4Dr/lK3ChBcwce2+SQP4=
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a/go.mod h1:oTGdE7/DlWIr23G0IKW3OXK9wZ5Hw1GGiaJFccTvZi4=
github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo=

View File

@ -294,27 +294,43 @@ func (user *User) handleLabelEvents(ctx context.Context, labelEvents []proton.La
for _, event := range labelEvents {
switch event.Action {
case proton.EventCreate:
if err := user.handleCreateLabelEvent(ctx, event); err != nil {
updates, err := user.handleCreateLabelEvent(ctx, event)
if err != nil {
return fmt.Errorf("failed to handle create label event: %w", err)
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return err
}
case proton.EventUpdate, proton.EventUpdateFlags:
if err := user.handleUpdateLabelEvent(ctx, event); err != nil {
updates, err := user.handleUpdateLabelEvent(ctx, event)
if err != nil {
return fmt.Errorf("failed to handle update label event: %w", err)
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return err
}
case proton.EventDelete:
if err := user.handleDeleteLabelEvent(ctx, event); err != nil {
updates, err := user.handleDeleteLabelEvent(ctx, event)
if err != nil {
return fmt.Errorf("failed to handle delete label event: %w", err)
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return err
}
}
}
return nil
}
func (user *User) handleCreateLabelEvent(_ context.Context, event proton.LabelEvent) error { //nolint:unparam
return safe.LockRet(func() error {
func (user *User) handleCreateLabelEvent(ctx context.Context, event proton.LabelEvent) ([]imap.Update, error) { //nolint:unparam
return safe.LockRetErr(func() ([]imap.Update, error) {
var updates []imap.Update
user.log.WithFields(logrus.Fields{
"labelID": event.ID,
"name": logging.Sensitive(event.Label.Name),
@ -325,7 +341,9 @@ func (user *User) handleCreateLabelEvent(_ context.Context, event proton.LabelEv
}
for _, updateCh := range xslices.Unique(maps.Values(user.updateCh)) {
updateCh.Enqueue(newMailboxCreatedUpdate(imap.MailboxID(event.ID), getMailboxName(event.Label)))
update := newMailboxCreatedUpdate(imap.MailboxID(event.ID), getMailboxName(event.Label))
updateCh.Enqueue(update)
updates = append(updates, update)
}
user.eventCh.Enqueue(events.UserLabelCreated{
@ -334,12 +352,13 @@ func (user *User) handleCreateLabelEvent(_ context.Context, event proton.LabelEv
Name: event.Label.Name,
})
return nil
return updates, nil
}, user.apiLabelsLock, user.updateChLock)
}
func (user *User) handleUpdateLabelEvent(_ context.Context, event proton.LabelEvent) error { //nolint:unparam
return safe.LockRet(func() error {
func (user *User) handleUpdateLabelEvent(ctx context.Context, event proton.LabelEvent) ([]imap.Update, error) { //nolint:unparam
return safe.LockRetErr(func() ([]imap.Update, error) {
var updates []imap.Update
user.log.WithFields(logrus.Fields{
"labelID": event.ID,
"name": logging.Sensitive(event.Label.Name),
@ -350,10 +369,12 @@ func (user *User) handleUpdateLabelEvent(_ context.Context, event proton.LabelEv
}
for _, updateCh := range xslices.Unique(maps.Values(user.updateCh)) {
updateCh.Enqueue(imap.NewMailboxUpdated(
update := imap.NewMailboxUpdated(
imap.MailboxID(event.ID),
getMailboxName(event.Label),
))
)
updateCh.Enqueue(update)
updates = append(updates, update)
}
user.eventCh.Enqueue(events.UserLabelUpdated{
@ -362,12 +383,14 @@ func (user *User) handleUpdateLabelEvent(_ context.Context, event proton.LabelEv
Name: event.Label.Name,
})
return nil
return updates, nil
}, user.apiLabelsLock, user.updateChLock)
}
func (user *User) handleDeleteLabelEvent(_ context.Context, event proton.LabelEvent) error { //nolint:unparam
return safe.LockRet(func() error {
func (user *User) handleDeleteLabelEvent(ctx context.Context, event proton.LabelEvent) ([]imap.Update, error) { //nolint:unparam
return safe.LockRetErr(func() ([]imap.Update, error) {
var updates []imap.Update
user.log.WithField("labelID", event.ID).Info("Handling label deleted event")
label, ok := user.apiLabels[event.ID]
@ -376,7 +399,9 @@ func (user *User) handleDeleteLabelEvent(_ context.Context, event proton.LabelEv
}
for _, updateCh := range xslices.Unique(maps.Values(user.updateCh)) {
updateCh.Enqueue(imap.NewMailboxDeleted(imap.MailboxID(event.ID)))
update := imap.NewMailboxDeleted(imap.MailboxID(event.ID))
updateCh.Enqueue(update)
updates = append(updates, update)
}
user.eventCh.Enqueue(events.UserLabelDeleted{
@ -385,7 +410,7 @@ func (user *User) handleDeleteLabelEvent(_ context.Context, event proton.LabelEv
Name: label.Name,
})
return nil
return updates, nil
}, user.apiLabelsLock, user.updateChLock)
}
@ -396,26 +421,32 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
switch event.Action {
case proton.EventCreate:
if err := user.handleCreateMessageEvent(
updates, err := user.handleCreateMessageEvent(
logging.WithLogrusField(ctx, "action", "create message"),
event,
); err != nil {
event)
if err != nil {
if rerr := user.reporter.ReportMessageWithContext("Failed to apply create message event", reporter.Context{
"error": err,
}); rerr != nil {
user.log.WithError(err).Error("Failed to report create message event error")
}
return fmt.Errorf("failed to handle create message event: %w", err)
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return err
}
case proton.EventUpdate, proton.EventUpdateFlags:
// Draft update means to completely remove old message and upload the new data again, but we should
// only do this if the event is of type EventUpdate otherwise label switch operations will not work.
if event.Message.IsDraft() && event.Action == proton.EventUpdate {
if err := user.handleUpdateDraftEvent(
updates, err := user.handleUpdateDraftEvent(
logging.WithLogrusField(ctx, "action", "update draft"),
event,
); err != nil {
)
if err != nil {
if rerr := user.reporter.ReportMessageWithContext("Failed to apply update draft message event", reporter.Context{
"error": err,
}); rerr != nil {
@ -424,6 +455,10 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
return fmt.Errorf("failed to handle update draft event: %w", err)
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return err
}
return nil
}
@ -431,10 +466,11 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
// whether the flags, labels or read only data (header+body) has been changed. This requires fixing proton
// first so that it correctly reports those cases.
// Issue regular update to handle mailboxes and flag changes.
if err := user.handleUpdateMessageEvent(
updates, err := user.handleUpdateMessageEvent(
logging.WithLogrusField(ctx, "action", "update message"),
event,
); err != nil {
)
if err != nil {
if rerr := user.reporter.ReportMessageWithContext("Failed to apply update message event", reporter.Context{
"error": err,
}); rerr != nil {
@ -443,11 +479,16 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
return fmt.Errorf("failed to handle update message event: %w", err)
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return err
}
case proton.EventDelete:
if err := user.handleDeleteMessageEvent(
updates, err := user.handleDeleteMessageEvent(
logging.WithLogrusField(ctx, "action", "delete message"),
event,
); err != nil {
)
if err != nil {
if rerr := user.reporter.ReportMessageWithContext("Failed to apply delete message event", reporter.Context{
"error": err,
}); rerr != nil {
@ -455,25 +496,30 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
}
return fmt.Errorf("failed to handle delete message event: %w", err)
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return err
}
}
}
return nil
}
func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.MessageEvent) error {
func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.MessageEvent) ([]imap.Update, error) {
full, err := user.client.GetFullMessage(ctx, event.Message.ID)
if err != nil {
return fmt.Errorf("failed to get full message: %w", err)
return nil, fmt.Errorf("failed to get full message: %w", err)
}
return safe.RLockRet(func() error {
return safe.RLockRetErr(func() ([]imap.Update, error) {
user.log.WithFields(logrus.Fields{
"messageID": event.ID,
"subject": logging.Sensitive(event.Message.Subject),
}).Info("Handling message created event")
return withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error {
var update imap.Update
if err := withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error {
res := buildRFC822(user.apiLabels, full, addrKR)
if res.err != nil {
@ -497,45 +543,56 @@ func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.Mes
user.log.WithError(err).Error("Failed to remove failed message ID from vault")
}
user.updateCh[full.AddressID].Enqueue(imap.NewMessagesCreated(false, res.update))
update = imap.NewMessagesCreated(false, res.update)
user.updateCh[full.AddressID].Enqueue(update)
return nil
})
}); err != nil {
return nil, err
}
return []imap.Update{update}, nil
}, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock, user.updateChLock)
}
func (user *User) handleUpdateMessageEvent(ctx context.Context, event proton.MessageEvent) error { //nolint:unparam
return safe.RLockRet(func() error {
func (user *User) handleUpdateMessageEvent(ctx context.Context, event proton.MessageEvent) ([]imap.Update, error) { //nolint:unparam
return safe.RLockRetErr(func() ([]imap.Update, error) {
user.log.WithFields(logrus.Fields{
"messageID": event.ID,
"subject": logging.Sensitive(event.Message.Subject),
}).Info("Handling message updated event")
user.updateCh[event.Message.AddressID].Enqueue(imap.NewMessageMailboxesUpdated(
update := imap.NewMessageMailboxesUpdated(
imap.MessageID(event.ID),
mapTo[string, imap.MailboxID](wantLabels(user.apiLabels, event.Message.LabelIDs)),
event.Message.Seen(),
event.Message.Starred(),
))
)
return nil
user.updateCh[event.Message.AddressID].Enqueue(update)
return []imap.Update{update}, nil
}, user.apiLabelsLock, user.updateChLock)
}
func (user *User) handleDeleteMessageEvent(ctx context.Context, event proton.MessageEvent) error { //nolint:unparam
return safe.RLockRet(func() error {
func (user *User) handleDeleteMessageEvent(ctx context.Context, event proton.MessageEvent) ([]imap.Update, error) { //nolint:unparam
return safe.RLockRetErr(func() ([]imap.Update, error) {
user.log.WithField("messageID", event.ID).Info("Handling message deleted event")
var updates []imap.Update
for _, updateCh := range xslices.Unique(maps.Values(user.updateCh)) {
updateCh.Enqueue(imap.NewMessagesDeleted(imap.MessageID(event.ID)))
update := imap.NewMessagesDeleted(imap.MessageID(event.ID))
updateCh.Enqueue(update)
updates = append(updates, update)
}
return nil
return updates, nil
}, user.updateChLock)
}
func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.MessageEvent) error { //nolint:unparam
return safe.RLockRet(func() error {
func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.MessageEvent) ([]imap.Update, error) { //nolint:unparam
return safe.RLockRetErr(func() ([]imap.Update, error) {
user.log.WithFields(logrus.Fields{
"messageID": event.ID,
"subject": logging.Sensitive(event.Message.Subject),
@ -543,10 +600,12 @@ func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.Messa
full, err := user.client.GetFullMessage(ctx, event.Message.ID)
if err != nil {
return fmt.Errorf("failed to get full draft: %w", err)
return nil, fmt.Errorf("failed to get full draft: %w", err)
}
return withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error {
var update imap.Update
if err := withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error {
res := buildRFC822(user.apiLabels, full, addrKR)
if res.err != nil {
@ -570,15 +629,21 @@ func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.Messa
user.log.WithError(err).Error("Failed to remove failed message ID from vault")
}
user.updateCh[full.AddressID].Enqueue(imap.NewMessageUpdated(
update = imap.NewMessageUpdated(
res.update.Message,
res.update.Literal,
res.update.MailboxIDs,
res.update.ParsedMessage,
))
)
user.updateCh[full.AddressID].Enqueue(update)
return nil
})
}); err != nil {
return nil, err
}
return []imap.Update{update}, nil
}, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock, user.updateChLock)
}
@ -602,3 +667,14 @@ func getMailboxName(label proton.Label) []string {
return name
}
func waitOnIMAPUpdates(ctx context.Context, updates []imap.Update) error {
for _, update := range updates {
err, ok := update.WaitContext(ctx)
if ok && err != nil {
return fmt.Errorf("failed to apply gluon update %v :%w", update.String(), err)
}
}
return nil
}

View File

@ -164,10 +164,14 @@ func (user *User) sync(ctx context.Context) error {
// nolint:exhaustive
func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh ...*queue.QueuedChannel[imap.Update]) error {
var updates []imap.Update
// Create placeholder Folders/Labels mailboxes with a random ID and with the \Noselect attribute.
for _, prefix := range []string{folderPrefix, labelPrefix} {
for _, updateCh := range updateCh {
updateCh.Enqueue(newPlaceHolderMailboxCreatedUpdate(prefix))
update := newPlaceHolderMailboxCreatedUpdate(prefix)
updateCh.Enqueue(update)
updates = append(updates, update)
}
}
@ -180,12 +184,16 @@ func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh
switch label.Type {
case proton.LabelTypeSystem:
for _, updateCh := range updateCh {
updateCh.Enqueue(newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name))
update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name)
updateCh.Enqueue(update)
updates = append(updates, update)
}
case proton.LabelTypeFolder, proton.LabelTypeLabel:
for _, updateCh := range updateCh {
updateCh.Enqueue(newMailboxCreatedUpdate(imap.MailboxID(labelID), getMailboxName(label)))
update := newMailboxCreatedUpdate(imap.MailboxID(labelID), getMailboxName(label))
updateCh.Enqueue(update)
updates = append(updates, update)
}
default:
@ -194,11 +202,11 @@ func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh
}
// Wait for all label updates to be applied.
for _, updateCh := range updateCh {
update := imap.NewNoop()
defer update.WaitContext(ctx)
updateCh.Enqueue(update)
for _, update := range updates {
err, ok := update.WaitContext(ctx)
if ok && err != nil {
return fmt.Errorf("failed to apply label create update in gluon %v: %w", update.String(), err)
}
}
return nil
@ -243,9 +251,9 @@ func syncMessages(
defer syncReporter.done()
type flushUpdate struct {
messageID string
noOps []*imap.Noop
batchLen int
messageID string
pushedUpdates []imap.Update
batchLen int
}
// The higher this value, the longer we can continue our download iteration before being blocked on channel writes
@ -322,29 +330,26 @@ func syncMessages(
flushers[res.addressID].push(res.update)
}
var pushedUpdates []imap.Update
for _, flusher := range flushers {
flusher.flush()
}
noopUpdates := make([]*imap.Noop, len(updateCh))
index := 0
for _, updateCh := range updateCh {
noopUpdates[index] = imap.NewNoop()
updateCh.Enqueue(noopUpdates[index])
index++
pushedUpdates = append(pushedUpdates, flusher.collectPushedUpdates()...)
}
flushUpdateCh <- flushUpdate{
messageID: batch[0].messageID,
noOps: noopUpdates,
batchLen: len(batch),
messageID: batch[0].messageID,
pushedUpdates: pushedUpdates,
batchLen: len(batch),
}
}
}()
for flushUpdate := range flushUpdateCh {
for _, up := range flushUpdate.noOps {
up.WaitContext(ctx)
for _, up := range flushUpdate.pushedUpdates {
err, ok := up.WaitContext(ctx)
if ok && err != nil {
return fmt.Errorf("failed to apply sync update to gluon %v: %w", up.String(), err)
}
}
if err := vault.SetLastMessageID(flushUpdate.messageID); err != nil {

View File

@ -23,8 +23,9 @@ import (
)
type flusher struct {
updateCh *queue.QueuedChannel[imap.Update]
updates []*imap.MessageCreated
updateCh *queue.QueuedChannel[imap.Update]
updates []*imap.MessageCreated
pushedUpdates []imap.Update
maxUpdateSize int
curChunkSize int
@ -47,8 +48,16 @@ func (f *flusher) push(update *imap.MessageCreated) {
func (f *flusher) flush() {
if len(f.updates) > 0 {
f.updateCh.Enqueue(imap.NewMessagesCreated(true, f.updates...))
update := imap.NewMessagesCreated(true, f.updates...)
f.updateCh.Enqueue(update)
f.updates = nil
f.curChunkSize = 0
f.pushedUpdates = append(f.pushedUpdates, update)
}
}
func (f *flusher) collectPushedUpdates() []imap.Update {
updates := f.pushedUpdates
f.pushedUpdates = nil
return updates
}

View File

@ -565,18 +565,6 @@ func (user *User) doEventPoll(ctx context.Context) error {
user.log.WithField("event", event).Debug("Handled API event")
// Wait for all events to be applied.
safe.RLock(func() {
for _, updateCh := range xslices.Unique(maps.Values(user.updateCh)) {
update := imap.NewNoop()
defer update.WaitContext(ctx)
updateCh.Enqueue(update)
}
}, user.updateChLock)
user.log.WithField("event", event).Debug("All events applied to gluon")
// Update the event ID in the vault.
if err := user.vault.SetEventID(event.EventID); err != nil {
return fmt.Errorf("failed to update event ID: %w", err)

View File

@ -110,7 +110,7 @@ func TestUser_AddressMode(t *testing.T) {
for _, imapConn := range imapConn {
go func(imapConn connector.Connector) {
for update := range imapConn.GetUpdates() {
update.Done()
update.Done(nil)
}
}(imapConn)
}
@ -226,7 +226,7 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *proton.Ma
for _, imapConn := range imapConn {
go func(imapConn connector.Connector) {
for update := range imapConn.GetUpdates() {
update.Done()
update.Done(nil)
}
}(imapConn)
}