GODT-2196: Do not generate message updates for unknown labels
During sync a user may continue to perform operations on the server it is possible we run into a message which has a labelID we are not aware of. To counter this we issue `CreateMessage` updates with `IgnoreUnknownMailboxIDs` set to true. Eventually, after sync the state will resolve itself with events.
This commit is contained in:
parent
48480bc839
commit
4b3d4690e8
2
go.mod
2
go.mod
|
@ -5,7 +5,7 @@ go 1.18
|
|||
require (
|
||||
github.com/0xAX/notificator v0.0.0-20220220101646-ee9b8921e557
|
||||
github.com/Masterminds/semver/v3 v3.1.1
|
||||
github.com/ProtonMail/gluon v0.14.2-0.20221214122222-2ab5c92d3546
|
||||
github.com/ProtonMail/gluon v0.14.2-0.20221215082650-7ba466ad6482
|
||||
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a
|
||||
github.com/ProtonMail/go-proton-api v0.2.2-0.20221213121236-3439b3eda101
|
||||
github.com/ProtonMail/go-rfc5322 v0.11.0
|
||||
|
|
6
go.sum
6
go.sum
|
@ -28,8 +28,8 @@ github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf h1:yc9daCCYUefEs
|
|||
github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf/go.mod h1:o0ESU9p83twszAU8LBeJKFAAMX14tISa0yk4Oo5TOqo=
|
||||
github.com/ProtonMail/docker-credential-helpers v1.1.0 h1:+kvUIpwWcbtP3WFv5sSvkFn/XLzSqPOB5AAthuk9xPk=
|
||||
github.com/ProtonMail/docker-credential-helpers v1.1.0/go.mod h1:mK0aBveCxhnQ756AmaTfXMZDeULvheYVhF/MWMErN5g=
|
||||
github.com/ProtonMail/gluon v0.14.2-0.20221214122222-2ab5c92d3546 h1:iyN4eO1Z0N+inMukpoBCmfbI+ubAop4Op/sdzmmUcm4=
|
||||
github.com/ProtonMail/gluon v0.14.2-0.20221214122222-2ab5c92d3546/go.mod h1:z2AxLIiBCT1K+0OBHyaDI7AEaO5qI6/BEC2TE42vs4Q=
|
||||
github.com/ProtonMail/gluon v0.14.2-0.20221215082650-7ba466ad6482 h1:5sriag5qQ5kJ9oc3LJc+aSDyxEDG1ZfZGgVDcSNvC+4=
|
||||
github.com/ProtonMail/gluon v0.14.2-0.20221215082650-7ba466ad6482/go.mod h1:z2AxLIiBCT1K+0OBHyaDI7AEaO5qI6/BEC2TE42vs4Q=
|
||||
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a h1:D+aZah+k14Gn6kmL7eKxoo/4Dr/lK3ChBcwce2+SQP4=
|
||||
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a/go.mod h1:oTGdE7/DlWIr23G0IKW3OXK9wZ5Hw1GGiaJFccTvZi4=
|
||||
github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo=
|
||||
|
@ -43,8 +43,6 @@ github.com/ProtonMail/go-message v0.0.0-20210611055058-fabeff2ec753/go.mod h1:NB
|
|||
github.com/ProtonMail/go-mime v0.0.0-20220302105931-303f85f7fe0f/go.mod h1:NYt+V3/4rEeDuaev/zw1zCq8uqVEuPHzDPo3OZrlGJ4=
|
||||
github.com/ProtonMail/go-mime v0.0.0-20220429130430-2192574d760f h1:4IWzKjHzZxdrW9k4zl/qCwenOVHDbVDADPPHFLjs0Oc=
|
||||
github.com/ProtonMail/go-mime v0.0.0-20220429130430-2192574d760f/go.mod h1:qRZgbeASl2a9OwmsV85aWwRqic0NHPh+9ewGAzb4cgM=
|
||||
github.com/ProtonMail/go-proton-api v0.2.2-0.20221213042823-5bfe853434e7 h1:jreVsSvIlslQpDks/OhEL1YyI1XBmhgzYDdoQ9u68UA=
|
||||
github.com/ProtonMail/go-proton-api v0.2.2-0.20221213042823-5bfe853434e7/go.mod h1:O7ZTIDOhJRkfQgtW8dB0ZSCq8OZsShjMQ3ahzpDheOk=
|
||||
github.com/ProtonMail/go-proton-api v0.2.2-0.20221213121236-3439b3eda101 h1:3VvTdvETVVRYoiTj6GHaAiLUD1DQp72wbeQAQdw9lIY=
|
||||
github.com/ProtonMail/go-proton-api v0.2.2-0.20221213121236-3439b3eda101/go.mod h1:JUo5IQG0hNuPRuDpOUsCOvtee6UjTEHHF1QN2i8RSos=
|
||||
github.com/ProtonMail/go-rfc5322 v0.11.0 h1:o5Obrm4DpmQEffvgsVqG6S4BKwC1Wat+hYwjIp2YcCY=
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"runtime"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ProtonMail/gluon/rfc822"
|
||||
"github.com/ProtonMail/go-proton-api"
|
||||
|
@ -209,7 +210,134 @@ func TestBridge_Sync_BadMessage(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func withClient(ctx context.Context, t *testing.T, s *server.Server, username string, password []byte, fn func(context.Context, *proton.Client)) {
|
||||
func TestBridge_SyncWithOngoingEvents(t *testing.T) {
|
||||
numMsg := 1 << 8
|
||||
messageSplitIndex := numMsg * 2 / 3
|
||||
renmainingMessageCount := numMsg - messageSplitIndex
|
||||
|
||||
messages := make([]string, 0, numMsg)
|
||||
|
||||
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) {
|
||||
userID, addrID, err := s.CreateUser("imap", password)
|
||||
require.NoError(t, err)
|
||||
|
||||
labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder)
|
||||
require.NoError(t, err)
|
||||
|
||||
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
|
||||
importResults := createNumMessages(ctx, t, c, addrID, labelID, numMsg)
|
||||
for _, v := range importResults {
|
||||
if len(v) != 0 {
|
||||
messages = append(messages, v)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
var total uint64
|
||||
|
||||
// The initial user should be fully synced.
|
||||
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, mocks *bridge.Mocks) {
|
||||
syncCh, done := chToType[events.Event, events.SyncFinished](bridge.GetEvents(events.SyncFinished{}))
|
||||
defer done()
|
||||
|
||||
// Count how many bytes it takes to fully sync the user.
|
||||
total = countBytesRead(netCtl, func() {
|
||||
userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, userID, (<-syncCh).UserID)
|
||||
})
|
||||
})
|
||||
|
||||
// Now let's remove the user and stop the network at 2/3 of the data.
|
||||
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, mocks *bridge.Mocks) {
|
||||
require.NoError(t, bridge.DeleteUser(ctx, userID))
|
||||
})
|
||||
|
||||
// Pretend we can only sync 2/3 of the original messages.
|
||||
netCtl.SetReadLimit(2 * total / 3)
|
||||
|
||||
// Login the user; its sync should fail.
|
||||
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(b *bridge.Bridge, mocks *bridge.Mocks) {
|
||||
{
|
||||
syncCh, done := chToType[events.Event, events.SyncFailed](b.GetEvents(events.SyncFailed{}))
|
||||
defer done()
|
||||
|
||||
userID, err := b.LoginFull(ctx, "imap", password, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, userID, (<-syncCh).UserID)
|
||||
|
||||
info, err := b.GetUserInfo(userID)
|
||||
require.NoError(t, err)
|
||||
require.True(t, info.State == bridge.Connected)
|
||||
|
||||
client, err := client.Dial(fmt.Sprintf("%v:%v", constants.Host, b.GetIMAPPort()))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass)))
|
||||
defer func() { _ = client.Logout() }()
|
||||
|
||||
status, err := client.Select(`Folders/folder`, false)
|
||||
require.NoError(t, err)
|
||||
require.Less(t, status.Messages, uint32(numMsg))
|
||||
}
|
||||
|
||||
// Create a new mailbox and move that last 1/3 of the messages into it to simulate user
|
||||
// actions during sync.
|
||||
{
|
||||
newLabelID, err := s.CreateLabel(userID, "folder2", "", proton.LabelTypeFolder)
|
||||
require.NoError(t, err)
|
||||
|
||||
messages := messages[messageSplitIndex:]
|
||||
|
||||
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
|
||||
require.NoError(t, c.UnlabelMessages(ctx, messages, labelID))
|
||||
require.NoError(t, c.LabelMessages(ctx, messages, newLabelID))
|
||||
})
|
||||
}
|
||||
|
||||
// Remove the network limit, allowing the sync to finish.
|
||||
netCtl.SetReadLimit(0)
|
||||
|
||||
{
|
||||
syncCh, done := chToType[events.Event, events.SyncFinished](b.GetEvents(events.SyncFinished{}))
|
||||
defer done()
|
||||
|
||||
require.Equal(t, userID, (<-syncCh).UserID)
|
||||
|
||||
info, err := b.GetUserInfo(userID)
|
||||
require.NoError(t, err)
|
||||
require.True(t, info.State == bridge.Connected)
|
||||
|
||||
client, err := client.Dial(fmt.Sprintf("%v:%v", constants.Host, b.GetIMAPPort()))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass)))
|
||||
defer func() { _ = client.Logout() }()
|
||||
|
||||
status, err := client.Select(`Folders/folder`, false)
|
||||
require.NoError(t, err)
|
||||
// Original folder should have more than 0 messages and less than the total.
|
||||
require.Greater(t, status.Messages, uint32(0))
|
||||
require.Less(t, status.Messages, uint32(numMsg))
|
||||
|
||||
// Check that the new messages arrive in the right location.
|
||||
require.Eventually(t, func() bool {
|
||||
status, err := client.Select(`Folders/folder2`, true)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if status.Messages != uint32(renmainingMessageCount) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}, 10*time.Second, 500*time.Millisecond)
|
||||
}
|
||||
})
|
||||
}, server.WithTLS(false))
|
||||
}
|
||||
|
||||
func withClient(ctx context.Context, t *testing.T, s *server.Server, username string, password []byte, fn func(context.Context, *proton.Client)) { //nolint:unparam
|
||||
m := proton.New(
|
||||
proton.WithHostURL(s.GetHostURL()),
|
||||
proton.WithTransport(proton.InsecureTransport()),
|
||||
|
|
|
@ -464,7 +464,7 @@ func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.Mes
|
|||
user.log.WithError(err).Error("Failed to remove failed message ID from vault")
|
||||
}
|
||||
|
||||
user.updateCh[full.AddressID].Enqueue(imap.NewMessagesCreated(res.update))
|
||||
user.updateCh[full.AddressID].Enqueue(imap.NewMessagesCreated(false, res.update))
|
||||
|
||||
return nil
|
||||
})
|
||||
|
|
|
@ -47,7 +47,7 @@ func (f *flusher) push(update *imap.MessageCreated) {
|
|||
|
||||
func (f *flusher) flush() {
|
||||
if len(f.updates) > 0 {
|
||||
f.updateCh.Enqueue(imap.NewMessagesCreated(f.updates...))
|
||||
f.updateCh.Enqueue(imap.NewMessagesCreated(true, f.updates...))
|
||||
f.updates = nil
|
||||
f.curChunkSize = 0
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue