fix(GODT-2327): Delay event processing until gluon user exists
We don't want to start processing events until those events have somewhere to be sent to. Also, to be safe, ensure remove and re-add the gluon user while clearing its sync status. This shouldn't be necessary.
This commit is contained in:
parent
5ea53ea5c0
commit
232875d5cc
|
@ -131,9 +131,22 @@ func (bridge *Bridge) addIMAPUser(ctx context.Context, user *user.User) error {
|
|||
// If the DB was newly created, clear the sync status; gluon's DB was not found.
|
||||
logrus.Warn("IMAP user DB was newly created, clearing sync status")
|
||||
|
||||
// Remove the user from IMAP so we can clear the sync status.
|
||||
if err := bridge.imapServer.RemoveUser(ctx, gluonID, false); err != nil {
|
||||
return fmt.Errorf("failed to remove IMAP user: %w", err)
|
||||
}
|
||||
|
||||
// Clear the sync status -- we need to resync all messages.
|
||||
if err := user.ClearSyncStatus(); err != nil {
|
||||
return fmt.Errorf("failed to clear sync status: %w", err)
|
||||
}
|
||||
|
||||
// Add the user back to the IMAP server.
|
||||
if isNew, err := bridge.imapServer.LoadUser(ctx, imapConn, gluonID, user.GluonKey()); err != nil {
|
||||
return fmt.Errorf("failed to add IMAP user: %w", err)
|
||||
} else if isNew {
|
||||
panic("IMAP user should already have a database")
|
||||
}
|
||||
} else if status := user.GetSyncStatus(); !status.HasLabels {
|
||||
// Otherwise, the DB already exists -- if the labels are not yet synced, we need to re-create the DB.
|
||||
if err := bridge.imapServer.RemoveUser(ctx, gluonID, true); err != nil {
|
||||
|
@ -171,7 +184,12 @@ func (bridge *Bridge) addIMAPUser(ctx context.Context, user *user.User) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Trigger a sync for the user, if needed.
|
||||
user.TriggerSync()
|
||||
|
||||
// Start processing events for the user.
|
||||
user.StartEvents()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -171,6 +171,39 @@ func New(
|
|||
return nil
|
||||
})
|
||||
|
||||
// When triggered, poll the API for events, optionally blocking until the poll is complete.
|
||||
user.goPollAPIEvents = func(wait bool) {
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
go func() { user.pollAPIEventsCh <- doneCh }()
|
||||
|
||||
if wait {
|
||||
<-doneCh
|
||||
}
|
||||
}
|
||||
|
||||
// When triggered, attempt to sync the user.
|
||||
user.goSync = user.tasks.Trigger(func(ctx context.Context) {
|
||||
user.log.Debug("Sync triggered")
|
||||
|
||||
user.abortable.Do(ctx, func(ctx context.Context) {
|
||||
if user.vault.SyncStatus().IsComplete() {
|
||||
user.log.Debug("Sync is already complete, skipping")
|
||||
} else if err := user.doSync(ctx); err != nil {
|
||||
user.log.WithError(err).Error("Failed to sync, will retry later")
|
||||
time.AfterFunc(SyncRetryCooldown, user.goSync)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
return user, nil
|
||||
}
|
||||
|
||||
func (user *User) TriggerSync() {
|
||||
user.goSync()
|
||||
}
|
||||
|
||||
func (user *User) StartEvents() {
|
||||
// Stream events from the API, logging any errors that occur.
|
||||
// This does nothing until the sync has been marked as complete.
|
||||
// When we receive an API event, we attempt to handle it.
|
||||
|
@ -206,37 +239,6 @@ func New(
|
|||
}
|
||||
}
|
||||
})
|
||||
|
||||
// When triggered, poll the API for events, optionally blocking until the poll is complete.
|
||||
user.goPollAPIEvents = func(wait bool) {
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
go func() { user.pollAPIEventsCh <- doneCh }()
|
||||
|
||||
if wait {
|
||||
<-doneCh
|
||||
}
|
||||
}
|
||||
|
||||
// When triggered, attempt to sync the user.
|
||||
user.goSync = user.tasks.Trigger(func(ctx context.Context) {
|
||||
user.log.Debug("Sync triggered")
|
||||
|
||||
user.abortable.Do(ctx, func(ctx context.Context) {
|
||||
if user.vault.SyncStatus().IsComplete() {
|
||||
user.log.Debug("Sync is already complete, skipping")
|
||||
} else if err := user.doSync(ctx); err != nil {
|
||||
user.log.WithError(err).Error("Failed to sync, will retry later")
|
||||
time.AfterFunc(SyncRetryCooldown, user.goSync)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
return user, nil
|
||||
}
|
||||
|
||||
func (user *User) TriggerSync() {
|
||||
user.goSync()
|
||||
}
|
||||
|
||||
// ID returns the user's ID.
|
||||
|
@ -497,8 +499,19 @@ func (user *User) GetSyncStatus() vault.SyncStatus {
|
|||
}
|
||||
|
||||
// ClearSyncStatus clears the sync status of the user.
|
||||
// This also drops any updates in the update channel(s).
|
||||
func (user *User) ClearSyncStatus() error {
|
||||
return user.vault.ClearSyncStatus()
|
||||
user.log.Info("Clearing sync status")
|
||||
|
||||
return safe.LockRet(func() error {
|
||||
user.initUpdateCh(user.vault.AddressMode())
|
||||
|
||||
if err := user.vault.ClearSyncStatus(); err != nil {
|
||||
return fmt.Errorf("failed to clear sync status: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, user.eventLock, user.apiAddrsLock, user.updateChLock)
|
||||
}
|
||||
|
||||
// Logout logs the user out from the API.
|
||||
|
|
Loading…
Reference in New Issue