Skip to content

Commit b313b7f

Browse files
authored
Delete workflow execution from visibility store even if it was deleted from main store (#3962)
1 parent 06188f3 commit b313b7f

File tree

5 files changed

+168
-31
lines changed

5 files changed

+168
-31
lines changed

service/frontend/service.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, enableReadF
231231
DeleteNamespacePageSize: dc.GetIntProperty(dynamicconfig.DeleteNamespacePageSize, 1000),
232232
DeleteNamespacePagesPerExecution: dc.GetIntProperty(dynamicconfig.DeleteNamespacePagesPerExecution, 256),
233233
DeleteNamespaceConcurrentDeleteExecutionsActivities: dc.GetIntProperty(dynamicconfig.DeleteNamespaceConcurrentDeleteExecutionsActivities, 4),
234-
DeleteNamespaceNamespaceDeleteDelay: dc.GetDurationProperty(dynamicconfig.DeleteNamespaceNamespaceDeleteDelay, 0),
234+
DeleteNamespaceNamespaceDeleteDelay: dc.GetDurationProperty(dynamicconfig.DeleteNamespaceNamespaceDeleteDelay, 0*time.Hour),
235235

236236
EnableSchedules: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.FrontendEnableSchedules, true),
237237

service/worker/deletenamespace/deleteexecutions/activities.go

+42-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"context"
2929

3030
"go.temporal.io/api/serviceerror"
31+
workflowpb "go.temporal.io/api/workflow/v1"
3132
"go.temporal.io/sdk/activity"
3233

3334
"go.temporal.io/server/api/historyservice/v1"
@@ -137,9 +138,18 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete
137138
case nil:
138139
result.SuccessCount++
139140
a.metricsHandler.Counter(metrics.DeleteExecutionsSuccessCount.GetMetricName()).Record(1)
140-
case *serviceerror.NotFound: // Workflow execution doesn't exist. Do nothing.
141+
142+
case *serviceerror.NotFound:
141143
a.metricsHandler.Counter(metrics.DeleteExecutionNotFoundCount.GetMetricName()).Record(1)
142-
a.logger.Info("Workflow execution is not found.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()))
144+
a.logger.Info("Workflow execution is not found in history service.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()))
145+
// The reasons why workflow execution doesn't exist in history service, but exists in visibility store might be:
146+
// 1. The workflow execution was deleted by someone else after last ListWorkflowExecutions call but before historyClient.DeleteWorkflowExecution call.
147+
// 2. Database is in inconsistent state: workflow execution was manually deleted from history store, but not from visibility store.
148+
// To avoid continuously getting this workflow execution from visibility store, it needs to be deleted directly from visibility store.
149+
s, e := a.deleteWorkflowExecutionFromVisibility(ctx, params.NamespaceID, execution)
150+
result.SuccessCount += s
151+
result.ErrorCount += e
152+
143153
default:
144154
result.ErrorCount++
145155
a.metricsHandler.Counter(metrics.DeleteExecutionFailuresCount.GetMetricName()).Record(1)
@@ -154,3 +164,33 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete
154164
}
155165
return result, nil
156166
}
167+
168+
func (a *Activities) deleteWorkflowExecutionFromVisibility(
169+
ctx context.Context,
170+
namespaceID namespace.ID,
171+
execution *workflowpb.WorkflowExecutionInfo,
172+
) (successCount int, errorCount int) {
173+
174+
a.logger.Info("Deleting workflow execution from visibility.", tag.WorkflowNamespaceID(namespaceID.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()))
175+
_, err := a.historyClient.DeleteWorkflowVisibilityRecord(ctx, &historyservice.DeleteWorkflowVisibilityRecordRequest{
176+
NamespaceId: namespaceID.String(),
177+
Execution: execution.GetExecution(),
178+
WorkflowStartTime: execution.GetStartTime(),
179+
WorkflowCloseTime: execution.GetCloseTime(),
180+
})
181+
switch err.(type) {
182+
case nil:
183+
// Indicates that main and visibility stores were in inconsistent state.
184+
a.metricsHandler.Counter(metrics.DeleteExecutionsSuccessCount.GetMetricName()).Record(1)
185+
a.logger.Info("Workflow execution deleted from visibility.", tag.WorkflowNamespaceID(namespaceID.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()))
186+
return 1, 0
187+
case *serviceerror.NotFound:
188+
// Indicates that workflow execution was deleted by someone else.
189+
a.logger.Error("Workflow execution is not found in visibility store.", tag.WorkflowNamespaceID(namespaceID.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()))
190+
return 0, 0
191+
default:
192+
a.metricsHandler.Counter(metrics.DeleteExecutionFailuresCount.GetMetricName()).Record(1)
193+
a.logger.Error("Unable to delete workflow execution from visibility store.", tag.WorkflowNamespaceID(namespaceID.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()), tag.Error(err))
194+
return 0, 1
195+
}
196+
}

service/worker/deletenamespace/deleteexecutions/workflow.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam
172172
}
173173
}
174174

175+
// If nextPageToken is nil then there are no more workflow executions to delete.
175176
if nextPageToken == nil {
176177
if result.ErrorCount == 0 {
177178
logger.Info("Successfully deleted workflow executions.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount))
@@ -181,7 +182,7 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam
181182
return result, nil
182183
}
183184

184-
// Too many workflow executions, and ConcurrentDeleteExecutionsActivities activities has been started already.
185+
// Too many workflow executions, and ConcurrentDeleteExecutionsActivities number of activities has been completed already.
185186
// Continue as new to prevent workflow history size explosion.
186187

187188
params.PreviousSuccessCount = result.SuccessCount

service/worker/deletenamespace/deleteexecutions/workflow_test.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ContinueAsNew(t *testing.T) {
190190
var a *Activities
191191

192192
env.OnActivity(a.GetNextPageTokenActivity, mock.Anything, mock.Anything).Return([]byte{3, 22, 83}, nil).Times(78)
193-
env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(DeleteExecutionsActivityResult{SuccessCount: 1}, nil).Times(78)
193+
env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(DeleteExecutionsActivityResult{SuccessCount: 1, ErrorCount: 0}, nil).Times(78)
194194

195195
env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{
196196
NamespaceID: "namespace-id",
@@ -306,8 +306,12 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_ManyExecutions(t *testing.T)
306306

307307
historyClient := historyservicemock.NewMockHistoryServiceClient(ctrl)
308308
historyClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, nil).Times(2)
309-
// NotFound errors should not affect the error count.
309+
310310
historyClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, serviceerror.NewNotFound("not found")).Times(2)
311+
// NotFound errors should not affect neither error nor success count.
312+
historyClient.EXPECT().DeleteWorkflowVisibilityRecord(gomock.Any(), gomock.Any()).Return(nil, serviceerror.NewNotFound("not found"))
313+
// NotFound in main store but no error from visibility store adds 1 to success count.
314+
historyClient.EXPECT().DeleteWorkflowVisibilityRecord(gomock.Any(), gomock.Any()).Return(nil, nil)
311315

312316
a := &Activities{
313317
visibilityManager: visibilityManager,
@@ -333,7 +337,7 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_ManyExecutions(t *testing.T)
333337
var result DeleteExecutionsResult
334338
require.NoError(t, env.GetWorkflowResult(&result))
335339
require.Equal(t, 0, result.ErrorCount)
336-
require.Equal(t, 2, result.SuccessCount)
340+
require.Equal(t, 3, result.SuccessCount)
337341
}
338342

339343
func Test_DeleteExecutionsWorkflow_NoActivityMocks_HistoryClientError(t *testing.T) {

tests/namespace_delete_test.go

+116-24
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ import (
4343
"go.temporal.io/api/workflowservice/v1"
4444

4545
"go.temporal.io/server/api/adminservice/v1"
46+
"go.temporal.io/server/common"
4647
"go.temporal.io/server/common/backoff"
4748
"go.temporal.io/server/common/dynamicconfig"
4849
"go.temporal.io/server/common/log"
50+
"go.temporal.io/server/common/persistence"
4951
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
5052
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
5153
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
@@ -190,24 +192,28 @@ func (s *namespaceTestSuite) Test_NamespaceDelete_WithWorkflows() {
190192
nsID := descResp.GetNamespaceInfo().GetId()
191193

192194
// Start few workflow executions.
195+
var executions []*commonpb.WorkflowExecution
193196
for i := 0; i < 100; i++ {
194-
_, err = s.frontendClient.StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
197+
wid := "wf_id_" + strconv.Itoa(i)
198+
resp, err := s.frontendClient.StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
195199
RequestId: uuid.New(),
196200
Namespace: "ns_name_los_angeles",
197-
WorkflowId: "wf_id_" + strconv.Itoa(i),
201+
WorkflowId: wid,
198202
WorkflowType: &commonpb.WorkflowType{Name: "workflowTypeName"},
199203
TaskQueue: &taskqueuepb.TaskQueue{Name: "taskQueueName"},
200204
})
201205
s.NoError(err)
206+
executions = append(executions, &commonpb.WorkflowExecution{
207+
WorkflowId: wid,
208+
RunId: resp.GetRunId(),
209+
})
202210
}
203211

204212
// Terminate some workflow executions.
205-
for i := 0; i < 30; i++ {
213+
for _, execution := range executions[:30] {
206214
_, err = s.frontendClient.TerminateWorkflowExecution(ctx, &workflowservice.TerminateWorkflowExecutionRequest{
207-
Namespace: "ns_name_los_angeles",
208-
WorkflowExecution: &commonpb.WorkflowExecution{
209-
WorkflowId: "wf_id_" + strconv.Itoa(i),
210-
},
215+
Namespace: "ns_name_los_angeles",
216+
WorkflowExecution: execution,
211217
})
212218
s.NoError(err)
213219
}
@@ -224,37 +230,123 @@ func (s *namespaceTestSuite) Test_NamespaceDelete_WithWorkflows() {
224230
s.NoError(err)
225231
s.Equal(enumspb.NAMESPACE_STATE_DELETED, descResp2.GetNamespaceInfo().GetState())
226232

227-
namespaceExistsOp := func() error {
233+
s.Eventually(func() bool {
228234
_, err := s.frontendClient.DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{
229235
Id: nsID,
230236
})
231237
var notFound *serviceerror.NamespaceNotFound
232-
if errors.As(err, &notFound) {
233-
_, err0 := s.frontendClient.DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{
234-
Namespace: "ns_name_los_angeles",
235-
Execution: &commonpb.WorkflowExecution{
236-
WorkflowId: "wf_id_0",
237-
},
238-
})
239-
_, err99 := s.frontendClient.DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{
238+
if !errors.As(err, &notFound) {
239+
return false // namespace still exists
240+
}
241+
242+
for _, execution := range executions {
243+
_, err = s.frontendClient.DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{
240244
Namespace: "ns_name_los_angeles",
241245
Execution: &commonpb.WorkflowExecution{
242-
WorkflowId: "wf_id_99",
246+
WorkflowId: execution.GetWorkflowId(),
243247
},
244248
})
245-
if errors.As(err0, &notFound) && errors.As(err99, &notFound) {
246-
return nil
249+
if !errors.As(err, &notFound) {
250+
return false // should never happen
247251
}
248252
}
249-
return errors.New("namespace still exists")
253+
return true
254+
}, 20*time.Second, time.Second)
255+
}
256+
257+
func (s *namespaceTestSuite) Test_NamespaceDelete_WithMissingWorkflows() {
258+
ctx, cancel := rpc.NewContextWithTimeoutAndVersionHeaders(10000 * time.Second)
259+
defer cancel()
260+
261+
retention := 24 * time.Hour
262+
_, err := s.frontendClient.RegisterNamespace(ctx, &workflowservice.RegisterNamespaceRequest{
263+
Namespace: "ns_name_los_angeles",
264+
Description: "Namespace to delete",
265+
WorkflowExecutionRetentionPeriod: &retention,
266+
HistoryArchivalState: enumspb.ARCHIVAL_STATE_DISABLED,
267+
VisibilityArchivalState: enumspb.ARCHIVAL_STATE_DISABLED,
268+
})
269+
s.NoError(err)
270+
// DescribeNamespace reads directly from database but namespace validator uses cache.
271+
s.cluster.RefreshNamespaceCache()
272+
273+
descResp, err := s.frontendClient.DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{
274+
Namespace: "ns_name_los_angeles",
275+
})
276+
s.NoError(err)
277+
nsID := descResp.GetNamespaceInfo().GetId()
278+
279+
// Start few workflow executions.
280+
281+
var executions []*commonpb.WorkflowExecution
282+
for i := 0; i < 10; i++ {
283+
wid := "wf_id_" + strconv.Itoa(i)
284+
resp, err := s.frontendClient.StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
285+
RequestId: uuid.New(),
286+
Namespace: "ns_name_los_angeles",
287+
WorkflowId: wid,
288+
WorkflowType: &commonpb.WorkflowType{Name: "workflowTypeName"},
289+
TaskQueue: &taskqueuepb.TaskQueue{Name: "taskQueueName"},
290+
})
291+
s.NoError(err)
292+
executions = append(executions, &commonpb.WorkflowExecution{
293+
WorkflowId: wid,
294+
RunId: resp.GetRunId(),
295+
})
250296
}
251297

252-
namespaceExistsPolicy := backoff.NewExponentialRetryPolicy(time.Second).
253-
WithBackoffCoefficient(1).
254-
WithExpirationInterval(30 * time.Second)
298+
// Delete some workflow executions from DB but not from visibility.
299+
// Every subsequent delete (from deleteexecutions.Workflow) from ES will take at least 1s due to bulk processor.
300+
for _, execution := range executions[0:5] {
301+
shardID := common.WorkflowIDToHistoryShard(
302+
nsID,
303+
execution.GetWorkflowId(),
304+
s.clusterConfig.HistoryConfig.NumHistoryShards,
305+
)
306+
307+
err = s.cluster.GetExecutionManager().DeleteWorkflowExecution(ctx, &persistence.DeleteWorkflowExecutionRequest{
308+
ShardID: shardID,
309+
NamespaceID: nsID,
310+
WorkflowID: execution.GetWorkflowId(),
311+
RunID: execution.GetRunId(),
312+
})
313+
s.NoError(err)
314+
}
255315

256-
err = backoff.ThrottleRetry(namespaceExistsOp, namespaceExistsPolicy, func(_ error) bool { return true })
316+
delResp, err := s.operatorClient.DeleteNamespace(ctx, &operatorservice.DeleteNamespaceRequest{
317+
Namespace: "ns_name_los_angeles",
318+
})
319+
s.NoError(err)
320+
s.Equal("ns_name_los_angeles-deleted-"+nsID[:5], delResp.GetDeletedNamespace())
321+
322+
descResp2, err := s.frontendClient.DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{
323+
Id: nsID,
324+
})
257325
s.NoError(err)
326+
s.Equal(enumspb.NAMESPACE_STATE_DELETED, descResp2.GetNamespaceInfo().GetState())
327+
328+
s.Eventually(func() bool {
329+
_, err := s.frontendClient.DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{
330+
Id: nsID,
331+
})
332+
var notFound *serviceerror.NamespaceNotFound
333+
if !errors.As(err, &notFound) {
334+
return false // namespace still exists
335+
}
336+
337+
for _, execution := range executions {
338+
_, err = s.frontendClient.DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{
339+
Namespace: "ns_name_los_angeles",
340+
Execution: &commonpb.WorkflowExecution{
341+
WorkflowId: execution.GetWorkflowId(),
342+
},
343+
})
344+
if !errors.As(err, &notFound) {
345+
return false // should never happen
346+
}
347+
}
348+
return true
349+
}, 20*time.Second, time.Second)
258350
}
259351

260352
func (s *namespaceTestSuite) Test_NamespaceDelete_CrossNamespaceChild() {

0 commit comments

Comments
 (0)