fix(GODT-3033): Unable to receive new mail
If the IMAP service happened to finish syncing and wanted to reset the user event service at a time the latter was publishing an event a deadlock would occur and the user would not receive any new messages. This change puts the request to revert the event id in a separate go-routine to avoid this situation from re-occurring. The operational flow remains unchanged as the event service will only process this request once the current set of events have been published.
This commit is contained in:
parent
8be4246f7e
commit
8b12a454ea
|
@ -22,6 +22,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ProtonMail/gluon/async"
|
||||
|
@ -94,7 +95,7 @@ type Service struct {
|
|||
|
||||
syncConfigPath string
|
||||
lastHandledEventID string
|
||||
isSyncing bool
|
||||
isSyncing atomic.Bool
|
||||
}
|
||||
|
||||
func NewService(
|
||||
|
@ -405,23 +406,28 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
|
|||
continue
|
||||
}
|
||||
|
||||
// Start a goroutine to wait on event reset as it is possible that the sync received message
|
||||
// was processed during an event publish. This in turn will block the imap service, since the
|
||||
// event service is unable to reply to the request until the events have been processed.
|
||||
s.log.Info("Sync complete, starting API event stream")
|
||||
if err := s.eventProvider.RewindEventID(ctx, s.lastHandledEventID); err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
continue
|
||||
go func() {
|
||||
if err := s.eventProvider.RewindEventID(ctx, s.lastHandledEventID); err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
s.log.WithError(err).Error("Failed to rewind event service")
|
||||
s.eventPublisher.PublishEvent(ctx, events.UserBadEvent{
|
||||
UserID: s.identityState.UserID(),
|
||||
OldEventID: "",
|
||||
NewEventID: "",
|
||||
EventInfo: "",
|
||||
Error: fmt.Errorf("failed to rewind event loop: %w", err),
|
||||
})
|
||||
}
|
||||
|
||||
s.log.WithError(err).Error("Failed to rewind event service")
|
||||
s.eventPublisher.PublishEvent(ctx, events.UserBadEvent{
|
||||
UserID: s.identityState.UserID(),
|
||||
OldEventID: "",
|
||||
NewEventID: "",
|
||||
EventInfo: "",
|
||||
Error: fmt.Errorf("failed to rewind event loop: %w", err),
|
||||
})
|
||||
}
|
||||
|
||||
s.isSyncing = false
|
||||
s.isSyncing.Store(false)
|
||||
}()
|
||||
}
|
||||
|
||||
case request, ok := <-s.syncUpdateApplier.requestCh:
|
||||
|
@ -443,7 +449,7 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
|
|||
continue
|
||||
}
|
||||
e.Consume(func(event proton.Event) error {
|
||||
if s.isSyncing {
|
||||
if s.isSyncing.Load() {
|
||||
if err := syncEventHandler.OnEvent(ctx, event); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -615,13 +621,13 @@ func (s *Service) setShowAllMail(v bool) {
|
|||
}
|
||||
|
||||
func (s *Service) startSyncing() {
|
||||
s.isSyncing = true
|
||||
s.isSyncing.Store(true)
|
||||
s.syncHandler.Execute(s.syncReporter, s.labels.GetLabelMap(), s.syncUpdateApplier, s.syncMessageBuilder, syncservice.DefaultRetryCoolDown)
|
||||
}
|
||||
|
||||
func (s *Service) cancelSync() {
|
||||
s.syncHandler.CancelAndWait()
|
||||
s.isSyncing = false
|
||||
s.isSyncing.Store(false)
|
||||
}
|
||||
|
||||
type resyncReq struct{}
|
||||
|
|
Loading…
Reference in New Issue