diff --git a/internal/services/imapservice/service.go b/internal/services/imapservice/service.go index 79e4b9f0..0ccbeced 100644 --- a/internal/services/imapservice/service.go +++ b/internal/services/imapservice/service.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "path/filepath" + "sync/atomic" "time" "github.com/ProtonMail/gluon/async" @@ -94,7 +95,7 @@ type Service struct { syncConfigPath string lastHandledEventID string - isSyncing bool + isSyncing atomic.Bool } func NewService( @@ -405,23 +406,28 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo continue } + // Start a goroutine to wait on event reset as it is possible that the sync received message + // was processed during an event publish. This in turn will block the imap service, since the + // event service is unable to reply to the request until the events have been processed. s.log.Info("Sync complete, starting API event stream") - if err := s.eventProvider.RewindEventID(ctx, s.lastHandledEventID); err != nil { - if errors.Is(err, context.Canceled) { - continue + go func() { + if err := s.eventProvider.RewindEventID(ctx, s.lastHandledEventID); err != nil { + if errors.Is(err, context.Canceled) { + return + } + + s.log.WithError(err).Error("Failed to rewind event service") + s.eventPublisher.PublishEvent(ctx, events.UserBadEvent{ + UserID: s.identityState.UserID(), + OldEventID: "", + NewEventID: "", + EventInfo: "", + Error: fmt.Errorf("failed to rewind event loop: %w", err), + }) } - s.log.WithError(err).Error("Failed to rewind event service") - s.eventPublisher.PublishEvent(ctx, events.UserBadEvent{ - UserID: s.identityState.UserID(), - OldEventID: "", - NewEventID: "", - EventInfo: "", - Error: fmt.Errorf("failed to rewind event loop: %w", err), - }) - } - - s.isSyncing = false + s.isSyncing.Store(false) + }() } case request, ok := <-s.syncUpdateApplier.requestCh: @@ -443,7 +449,7 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo continue } e.Consume(func(event proton.Event) error { - if s.isSyncing { + if s.isSyncing.Load() { if err := syncEventHandler.OnEvent(ctx, event); err != nil { return err } @@ -615,13 +621,13 @@ func (s *Service) setShowAllMail(v bool) { } func (s *Service) startSyncing() { - s.isSyncing = true + s.isSyncing.Store(true) s.syncHandler.Execute(s.syncReporter, s.labels.GetLabelMap(), s.syncUpdateApplier, s.syncMessageBuilder, syncservice.DefaultRetryCoolDown) } func (s *Service) cancelSync() { s.syncHandler.CancelAndWait() - s.isSyncing = false + s.isSyncing.Store(false) } type resyncReq struct{}