Skip to content

Commit eb4b574

Browse files
Fix some bugs in the archival queue task executor (#3813)
Fix two bugs in the archival queue task executor
1 parent 5413d18 commit eb4b574

File tree

3 files changed

+51
-43
lines changed

3 files changed

+51
-43
lines changed

common/metrics/metric_defs.go

+3
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ const (
6767

6868
MutableStateCacheTypeTagValue = "mutablestate"
6969
EventsCacheTypeTagValue = "events"
70+
71+
InvalidHistoryURITagValue = "invalid_history_uri"
72+
InvalidVisibilityURITagValue = "invalid_visibility_uri"
7073
)
7174

7275
// Common service base metrics

service/history/archival_queue_task_executor.go

+36-39
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,11 @@ func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Cont
112112
if err != nil {
113113
return err
114114
}
115-
_, err = e.archiver.Archive(ctx, request)
116-
if err != nil {
117-
return err
115+
if len(request.Targets) > 0 {
116+
_, err = e.archiver.Archive(ctx, request)
117+
if err != nil {
118+
return err
119+
}
118120
}
119121
return e.addDeletionTask(ctx, logger, task, request.CloseTime)
120122
}
@@ -154,52 +156,47 @@ func (e *archivalQueueTaskExecutor) getArchiveTaskRequest(
154156
return nil, err
155157
}
156158

159+
var historyURI, visibilityURI carchiver.URI
157160
var targets []archival.Target
158161
if e.shardContext.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() &&
159162
namespaceEntry.VisibilityArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED {
160163
targets = append(targets, archival.TargetVisibility)
164+
visibilityURIString := namespaceEntry.VisibilityArchivalState().URI
165+
visibilityURI, err = carchiver.NewURI(visibilityURIString)
166+
if err != nil {
167+
e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record(
168+
1,
169+
metrics.NamespaceTag(namespaceName.String()),
170+
metrics.FailureTag(metrics.InvalidVisibilityURITagValue),
171+
)
172+
logger.Error(
173+
"Failed to parse visibility URI.",
174+
tag.ArchivalURI(visibilityURIString),
175+
tag.Error(err),
176+
)
177+
return nil, fmt.Errorf("failed to parse visibility URI for archival task: %w", err)
178+
}
161179
}
162180
if e.shardContext.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() &&
163181
namespaceEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED {
182+
historyURIString := namespaceEntry.HistoryArchivalState().URI
183+
historyURI, err = carchiver.NewURI(historyURIString)
184+
if err != nil {
185+
e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record(
186+
1,
187+
metrics.NamespaceTag(namespaceName.String()),
188+
metrics.FailureTag(metrics.InvalidHistoryURITagValue),
189+
)
190+
logger.Error(
191+
"Failed to parse history URI.",
192+
tag.ArchivalURI(historyURIString),
193+
tag.Error(err),
194+
)
195+
return nil, fmt.Errorf("failed to parse history URI for archival task: %w", err)
196+
}
164197
targets = append(targets, archival.TargetHistory)
165198
}
166-
if len(targets) == 0 {
167-
return nil, fmt.Errorf(
168-
"no archival targets configured for archive execution task: %+v",
169-
task.WorkflowKey,
170-
)
171-
}
172199

173-
historyURIString := namespaceEntry.HistoryArchivalState().URI
174-
historyURI, err := carchiver.NewURI(historyURIString)
175-
if err != nil {
176-
e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record(
177-
1,
178-
metrics.NamespaceTag(namespaceName.String()),
179-
metrics.FailureTag("invalid_history_uri"),
180-
)
181-
logger.Error(
182-
"Failed to parse history URI.",
183-
tag.ArchivalURI(historyURIString),
184-
tag.Error(err),
185-
)
186-
return nil, fmt.Errorf("failed to parse history URI for archival task: %w", err)
187-
}
188-
visibilityURIString := namespaceEntry.VisibilityArchivalState().URI
189-
visibilityURI, err := carchiver.NewURI(visibilityURIString)
190-
if err != nil {
191-
e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record(
192-
1,
193-
metrics.NamespaceTag(namespaceName.String()),
194-
metrics.FailureTag("invalid_visibility_uri"),
195-
)
196-
logger.Error(
197-
"Failed to parse visibility URI.",
198-
tag.ArchivalURI(visibilityURIString),
199-
tag.Error(err),
200-
)
201-
return nil, fmt.Errorf("failed to parse visibility URI for archival task: %w", err)
202-
}
203200
workflowAttributes, err := e.relocatableAttributesFetcher.Fetch(ctx, mutableState)
204201
if err != nil {
205202
return nil, err

service/history/archival_queue_task_executor_test.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,18 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
7373
}
7474
},
7575
},
76+
{
77+
Name: "URIs are not read for empty targets",
78+
Configure: func(p *params) {
79+
p.HistoryConfig.ClusterEnabled = false
80+
p.VisibilityConfig.ClusterEnabled = false
81+
// we set the URIs to invalid values which would produce errors if they were read
82+
// we should not read these URIs because history and visibility archival are disabled
83+
p.HistoryURI = "invalid_uri"
84+
p.VisibilityURI = "invalid_uri"
85+
p.ExpectArchive = false
86+
},
87+
},
7688
{
7789
Name: "history archival disabled for namespace",
7890
Configure: func(p *params) {
@@ -105,11 +117,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
105117
Configure: func(p *params) {
106118
p.VisibilityConfig.NamespaceArchivalState = carchiver.ArchivalDisabled
107119
p.HistoryConfig.NamespaceArchivalState = carchiver.ArchivalDisabled
108-
p.ExpectedErrorSubstrings = []string{
109-
"no archival targets",
110-
}
111120
p.ExpectArchive = false
112-
p.ExpectAddTask = false
113121
},
114122
},
115123
{

0 commit comments

Comments
 (0)