diff --git a/internal/services/syncservice/stage_build.go b/internal/services/syncservice/stage_build.go index 16cd4e97..55f9fd75 100644 --- a/internal/services/syncservice/stage_build.go +++ b/internal/services/syncservice/stage_build.go @@ -101,6 +101,23 @@ func (b *BuildStage) run(ctx context.Context) { continue } + if len(req.batch) == 0 { + // it is possible that if one does a mass delete on another client an entire download batch fails, + // and we reach this point without any messages to build. + req.onStageCompleted(ctx) + + if err := b.output.Produce(ctx, ApplyRequest{ + childJob: req.childJob, + messages: nil, + }); err != nil { + err = fmt.Errorf("failed to produce output for next stage: %w", err) + logrus.Errorf(err.Error()) + req.job.onError(err) + } + + continue + } + err = req.job.messageBuilder.WithKeys(func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error { chunks := chunkSyncBuilderBatch(req.batch, b.maxBuildMem) diff --git a/internal/services/syncservice/stage_build_test.go b/internal/services/syncservice/stage_build_test.go index 6b51f949..4169a1e1 100644 --- a/internal/services/syncservice/stage_build_test.go +++ b/internal/services/syncservice/stage_build_test.go @@ -321,3 +321,35 @@ func TestBuildStage_CancelledJobIsDiscarded(t *testing.T) { _, err := output.Consume(context.Background()) require.ErrorIs(t, err, ErrNoMoreInput) } + +func TestTask_EmptyInputDoesNotCrash(t *testing.T) { + mockCtrl := gomock.NewController(t) + + input := NewChannelConsumerProducer[BuildRequest]() + output := NewChannelConsumerProducer[ApplyRequest]() + reporter := mocks.NewMockReporter(mockCtrl) + + labels := getTestLabels() + + ctx, cancel := context.WithCancel(context.Background()) + tj := newTestJob(ctx, mockCtrl, "u", labels) + + tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Eq(int64(10))) + + tj.job.begin() + childJob := tj.job.newChildJob("f", 10) + tj.job.end() + + stage := NewBuildStage(input, output, 1024, &async.NoopPanicHandler{}, reporter) + + go func() { + stage.run(ctx) + }() + + require.NoError(t, input.Produce(ctx, BuildRequest{childJob: childJob, batch: []proton.FullMessage{}})) + + req, err := output.Consume(ctx) + cancel() + require.NoError(t, err) + require.Len(t, req.messages, 0) +}