Skip to content

Commit f9bd5b0

Browse files
authored
DeleteExecutions workflow: pass nextPageToken with ContinueAsNewError (#3966)
1 parent cbed0fe commit f9bd5b0

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

service/worker/deletenamespace/deleteexecutions/workflow.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type (
5050
PreviousSuccessCount int
5151
PreviousErrorCount int
5252
ContinueAsNewCount int
53+
NextPageToken []byte
5354
}
5455

5556
DeleteExecutionsResult struct {
@@ -105,7 +106,7 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam
105106
logger.Info("Effective config.", tag.Value(params.Config.String()))
106107

107108
var a *Activities
108-
var nextPageToken []byte
109+
nextPageToken := params.NextPageToken
109110
runningDeleteExecutionsActivityCount := 0
110111
runningDeleteExecutionsSelector := workflow.NewSelector(ctx)
111112
var lastDeleteExecutionsActivityErr error
@@ -186,6 +187,7 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam
186187
params.PreviousSuccessCount = result.SuccessCount
187188
params.PreviousErrorCount = result.ErrorCount
188189
params.ContinueAsNewCount++
190+
params.NextPageToken = nextPageToken
189191

190192
logger.Info("There are more workflows to delete. Continuing workflow as new.", tag.WorkflowType(WorkflowName), tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount), tag.Counter(params.ContinueAsNewCount))
191193
return result, workflow.NewContinueAsNewError(ctx, DeleteExecutionsWorkflow, params)

service/worker/deletenamespace/deleteexecutions/workflow_test.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"go.temporal.io/server/common/log"
4545
"go.temporal.io/server/common/metrics"
4646
"go.temporal.io/server/common/namespace"
47+
"go.temporal.io/server/common/payloads"
4748
"go.temporal.io/server/common/persistence/visibility/manager"
4849
)
4950

@@ -189,7 +190,7 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ContinueAsNew(t *testing.T) {
189190
var a *Activities
190191

191192
env.OnActivity(a.GetNextPageTokenActivity, mock.Anything, mock.Anything).Return([]byte{3, 22, 83}, nil).Times(78)
192-
env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(DeleteExecutionsActivityResult{}, nil).Times(78)
193+
env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(DeleteExecutionsActivityResult{SuccessCount: 1}, nil).Times(78)
193194

194195
env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{
195196
NamespaceID: "namespace-id",
@@ -205,6 +206,14 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ContinueAsNew(t *testing.T) {
205206
require.Error(t, wfErr)
206207
var errContinueAsNew *workflow.ContinueAsNewError
207208
require.ErrorAs(t, wfErr, &errContinueAsNew)
209+
210+
require.NotNil(t, errContinueAsNew.Input)
211+
var newWfParams DeleteExecutionsParams
212+
err := payloads.Decode(errContinueAsNew.Input, &newWfParams)
213+
require.NoError(t, err)
214+
require.Equal(t, 78, newWfParams.PreviousSuccessCount)
215+
require.Equal(t, 0, newWfParams.PreviousErrorCount)
216+
require.Equal(t, []byte{3, 22, 83}, newWfParams.NextPageToken)
208217
}
209218

210219
func Test_DeleteExecutionsWorkflow_ManyExecutions_ActivityError(t *testing.T) {

0 commit comments

Comments
 (0)