Skip to content

Commit 0101924

Browse files
authored
Improve execution scavenger (#3674)
* Improve execution scavenger
1 parent f6a340d commit 0101924

File tree

3 files changed

+41
-19
lines changed

3 files changed

+41
-19
lines changed

common/metrics/metric_defs.go

+1
Original file line numberDiff line numberDiff line change
@@ -1651,6 +1651,7 @@ var (
16511651
ArchiverWorkflowStoppingCount = NewCounterDef("archiver_workflow_stopping")
16521652
ScavengerValidationRequestsCount = NewCounterDef("scavenger_validation_requests")
16531653
ScavengerValidationFailuresCount = NewCounterDef("scavenger_validation_failures")
1654+
ScavengerValidationSkipsCount = NewCounterDef("scavenger_validation_skips")
16541655
AddSearchAttributesFailuresCount = NewCounterDef("add_search_attributes_failures")
16551656
DeleteNamespaceSuccessCount = NewCounterDef("delete_namespace_success")
16561657
RenameNamespaceSuccessCount = NewCounterDef("rename_namespace_success")

service/worker/scanner/executions/mutable_state_validator.go

+14-11
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,20 @@ func (v *mutableStateValidator) Validate(
9191

9292
var results []MutableStateValidationResult
9393

94+
// First, to check if the data is expired on retention time.
95+
retentionResult, err := v.validateRetention(
96+
mutableState.GetExecutionInfo(),
97+
mutableState.GetExecutionState().GetState(),
98+
)
99+
if err != nil {
100+
return results, err
101+
}
102+
if retentionResult != nil {
103+
// Skip all validation if the data is expired.
104+
results = append(results, *retentionResult)
105+
return results, nil
106+
}
107+
94108
results = append(results, v.validateActivity(
95109
mutableState.ActivityInfos,
96110
lastItem.GetEventId())...,
@@ -116,17 +130,6 @@ func (v *mutableStateValidator) Validate(
116130
lastItem.GetEventId())...,
117131
)
118132

119-
retentionResult, err := v.validateRetention(
120-
mutableState.GetExecutionInfo(),
121-
mutableState.GetExecutionState().GetState(),
122-
)
123-
if err != nil {
124-
return results, err
125-
}
126-
if retentionResult != nil {
127-
results = append(results, *retentionResult)
128-
}
129-
130133
return results, nil
131134
}
132135

service/worker/scanner/executions/task.go

+26-8
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func newTask(
9494
historyClient: historyClient,
9595
adminClient: adminClient,
9696

97-
metricsHandler: metricsHandler,
97+
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ExecutionsScavengerScope)),
9898
logger: logger,
9999
scavenger: scavenger,
100100

@@ -112,12 +112,15 @@ func (t *task) Run() executor.TaskStatus {
112112
))
113113

114114
iter := collection.NewPagingIteratorWithToken(t.getPaginationFn(), t.paginationToken)
115+
var retryTask bool
115116
for iter.HasNext() {
116117
_ = t.rateLimiter.Wait(t.ctx)
117118
record, err := iter.Next()
118119
if err != nil {
120+
t.metricsHandler.Counter(metrics.ScavengerValidationSkipsCount.GetMetricName()).Record(1)
121+
// continue validation process and retry after all workflow records has been iterated.
119122
t.logger.Error("unable to paginate concrete execution", tag.ShardID(t.shardID), tag.Error(err))
120-
return executor.TaskStatusDefer
123+
retryTask = true
121124
}
122125

123126
mutableState := &MutableState{WorkflowMutableState: record}
@@ -130,10 +133,21 @@ func (t *task) Run() executor.TaskStatus {
130133
)
131134
err = t.handleFailures(mutableState, results)
132135
if err != nil {
133-
t.logger.Error("unable to process failure result", tag.ShardID(t.shardID), tag.Error(err))
134-
return executor.TaskStatusDefer
136+
// continue validation process and retry after all workflow records has been iterated.
137+
executionInfo := mutableState.GetExecutionInfo()
138+
t.metricsHandler.Counter(metrics.ScavengerValidationSkipsCount.GetMetricName()).Record(1)
139+
t.logger.Error("unable to process failure result",
140+
tag.ShardID(t.shardID),
141+
tag.Error(err),
142+
tag.WorkflowNamespaceID(executionInfo.GetNamespaceId()),
143+
tag.WorkflowID(executionInfo.GetWorkflowId()),
144+
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()))
145+
retryTask = true
135146
}
136147
}
148+
if retryTask {
149+
return executor.TaskStatusDefer
150+
}
137151
return executor.TaskStatusDone
138152
}
139153

@@ -167,6 +181,11 @@ func (t *task) validate(
167181
results = append(results, validationResults...)
168182
}
169183

184+
// Fail fast if the mutable is corrupted, no need to validate history.
185+
if len(results) > 0 {
186+
return results
187+
}
188+
170189
if validationResults, err := NewHistoryEventIDValidator(
171190
t.shardID,
172191
t.executionManager,
@@ -254,15 +273,14 @@ func printValidationResult(
254273
metricsHandler metrics.MetricsHandler,
255274
logger log.Logger,
256275
) {
257-
handler := metricsHandler.WithTags(metrics.OperationTag(metrics.ExecutionsScavengerScope), metrics.FailureTag(""))
258-
handler.Counter(metrics.ScavengerValidationRequestsCount.GetMetricName()).Record(1)
276+
metricsHandler.Counter(metrics.ScavengerValidationRequestsCount.GetMetricName()).Record(1)
259277
if len(results) == 0 {
260278
return
261279
}
262280

263-
handler.Counter(metrics.ScavengerValidationFailuresCount.GetMetricName()).Record(1)
281+
metricsHandler.Counter(metrics.ScavengerValidationFailuresCount.GetMetricName()).Record(1)
264282
for _, result := range results {
265-
handler.Counter(metrics.ScavengerValidationFailuresCount.GetMetricName()).Record(1, metrics.FailureTag(result.failureType))
283+
metricsHandler.Counter(metrics.ScavengerValidationFailuresCount.GetMetricName()).Record(1, metrics.FailureTag(result.failureType))
266284
logger.Info(
267285
"validation failed for execution.",
268286
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().GetNamespaceId()),

0 commit comments

Comments
 (0)