Skip to content

Commit 728f6ce

Browse files
Improve archival_test (#3719)
1 parent 0b4df6a commit 728f6ce

File tree

1 file changed

+64
-11
lines changed

1 file changed

+64
-11
lines changed

host/archival_test.go

+64-11
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,15 @@ import (
4646
"go.temporal.io/api/workflowservice/v1"
4747

4848
"go.temporal.io/server/common"
49+
"go.temporal.io/server/common/archiver"
4950
"go.temporal.io/server/common/convert"
5051
"go.temporal.io/server/common/dynamicconfig"
5152
"go.temporal.io/server/common/log/tag"
5253
"go.temporal.io/server/common/payloads"
5354
"go.temporal.io/server/common/persistence"
55+
"go.temporal.io/server/common/primitives"
5456
"go.temporal.io/server/common/primitives/timestamp"
57+
"go.temporal.io/server/common/searchattribute"
5558
)
5659

5760
const (
@@ -100,7 +103,7 @@ func (s *archivalSuite) TestArchival_TimerQueueProcessor() {
100103
WorkflowId: workflowID,
101104
RunId: runID,
102105
}
103-
s.True(s.isHistoryArchived(s.archivalNamespace, execution))
106+
s.True(s.isArchived(s.archivalNamespace, execution))
104107
s.True(s.isHistoryDeleted(execution))
105108
s.True(s.isMutableStateDeleted(namespaceID, execution))
106109
}
@@ -121,7 +124,7 @@ func (s *archivalSuite) TestArchival_ContinueAsNew() {
121124
WorkflowId: workflowID,
122125
RunId: runID,
123126
}
124-
s.True(s.isHistoryArchived(s.archivalNamespace, execution))
127+
s.True(s.isArchived(s.archivalNamespace, execution))
125128
s.True(s.isHistoryDeleted(execution))
126129
s.True(s.isMutableStateDeleted(namespaceID, execution))
127130
}
@@ -143,7 +146,7 @@ func (s *archivalSuite) TestArchival_ArchiverWorker() {
143146
WorkflowId: workflowID,
144147
RunId: runID,
145148
}
146-
s.True(s.isHistoryArchived(s.archivalNamespace, execution))
149+
s.True(s.isArchived(s.archivalNamespace, execution))
147150
s.True(s.isHistoryDeleted(execution))
148151
s.True(s.isMutableStateDeleted(namespaceID, execution))
149152
}
@@ -201,18 +204,68 @@ func (s *IntegrationBase) getNamespaceID(namespace string) string {
201204
return namespaceResp.NamespaceInfo.GetId()
202205
}
203206

204-
func (s *archivalSuite) isHistoryArchived(namespace string, execution *commonpb.WorkflowExecution) bool {
205-
request := &workflowservice.GetWorkflowExecutionHistoryRequest{
206-
Namespace: s.archivalNamespace,
207-
Execution: execution,
208-
}
207+
// isArchived returns true if both the workflow history and workflow visibility are archived.
208+
func (s *archivalSuite) isArchived(namespace string, execution *commonpb.WorkflowExecution) bool {
209+
serviceName := string(primitives.HistoryService)
210+
historyURI, err := archiver.NewURI(s.testCluster.archiverBase.historyURI)
211+
s.NoError(err)
212+
historyArchiver, err := s.testCluster.archiverBase.provider.GetHistoryArchiver(
213+
historyURI.Scheme(),
214+
serviceName,
215+
)
216+
s.NoError(err)
217+
218+
visibilityURI, err := archiver.NewURI(s.testCluster.archiverBase.visibilityURI)
219+
s.NoError(err)
220+
visibilityArchiver, err := s.testCluster.archiverBase.provider.GetVisibilityArchiver(
221+
visibilityURI.Scheme(),
222+
serviceName,
223+
)
224+
s.NoError(err)
209225

210226
for i := 0; i < retryLimit; i++ {
211-
getHistoryResp, err := s.engine.GetWorkflowExecutionHistory(NewContext(), request)
212-
if err == nil && getHistoryResp != nil && getHistoryResp.GetArchived() {
227+
ctx := NewContext()
228+
if i > 0 {
229+
time.Sleep(retryBackoffTime)
230+
}
231+
namespaceID := s.getNamespaceID(namespace)
232+
var historyResponse *archiver.GetHistoryResponse
233+
historyResponse, err = historyArchiver.Get(ctx, historyURI, &archiver.GetHistoryRequest{
234+
NamespaceID: namespaceID,
235+
WorkflowID: execution.GetWorkflowId(),
236+
RunID: execution.GetRunId(),
237+
PageSize: 1,
238+
})
239+
if err != nil {
240+
continue
241+
}
242+
if len(historyResponse.HistoryBatches) == 0 {
243+
continue
244+
}
245+
var visibilityResponse *archiver.QueryVisibilityResponse
246+
visibilityResponse, err = visibilityArchiver.Query(
247+
ctx,
248+
visibilityURI,
249+
&archiver.QueryVisibilityRequest{
250+
NamespaceID: namespaceID,
251+
PageSize: 1,
252+
Query: fmt.Sprintf(
253+
"WorkflowId = '%s' and RunId = '%s'",
254+
execution.GetWorkflowId(),
255+
execution.GetRunId(),
256+
),
257+
},
258+
searchattribute.NameTypeMap{},
259+
)
260+
if err != nil {
261+
continue
262+
}
263+
if len(visibilityResponse.Executions) > 0 {
213264
return true
214265
}
215-
time.Sleep(retryBackoffTime)
266+
}
267+
if err != nil {
268+
fmt.Println("isArchived failed with error: ", err)
216269
}
217270
return false
218271
}

0 commit comments

Comments
 (0)