Skip to content

Commit b572419

Browse files
Fix bug where we return nil when there's an error in the archival queue (#3723)
1 parent 0ad773b commit b572419

File tree

2 files changed

+200
-49
lines changed

2 files changed

+200
-49
lines changed

service/history/archival_queue_task_executor.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"context"
2929
"errors"
3030
"fmt"
31+
"time"
3132

3233
enumspb "go.temporal.io/api/enums/v1"
3334

@@ -115,7 +116,7 @@ func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Cont
115116
if err != nil {
116117
return err
117118
}
118-
return e.addDeletionTask(ctx, logger, task)
119+
return e.addDeletionTask(ctx, logger, task, request.CloseTime)
119120
}
120121

121122
// getArchiveTaskRequest returns an archival request for the given archive execution task.
@@ -230,7 +231,12 @@ func (e *archivalQueueTaskExecutor) getArchiveTaskRequest(
230231
}
231232

232233
// addDeletionTask adds a task to delete workflow history events from primary storage.
233-
func (e *archivalQueueTaskExecutor) addDeletionTask(ctx context.Context, logger log.Logger, task *tasks.ArchiveExecutionTask) error {
234+
func (e *archivalQueueTaskExecutor) addDeletionTask(
235+
ctx context.Context,
236+
logger log.Logger,
237+
task *tasks.ArchiveExecutionTask,
238+
closeTime *time.Time,
239+
) error {
234240
mutableState, err := e.loadAndVersionCheckMutableState(ctx, logger, task)
235241
if err != nil {
236242
return err
@@ -244,13 +250,9 @@ func (e *archivalQueueTaskExecutor) addDeletionTask(ctx context.Context, logger
244250
mutableState,
245251
e.shardContext.GetConfig(),
246252
)
247-
closeTime, err := mutableState.GetWorkflowCloseTime(ctx)
248-
if err != nil {
249-
return err
250-
}
251253
err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true)
252254
if err != nil {
253-
return nil
255+
return err
254256
}
255257
err = e.shardContext.AddTasks(ctx, &persistence.AddHistoryTasksRequest{
256258
ShardID: e.shardContext.GetShardID(),

service/history/archival_queue_task_executor_test.go

+191-42
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/stretchr/testify/require"
3636
enumspb "go.temporal.io/api/enums/v1"
3737
"go.temporal.io/api/serviceerror"
38+
workflowpb "go.temporal.io/api/workflow/v1"
3839

3940
"go.temporal.io/server/api/persistence/v1"
4041
carchiver "go.temporal.io/server/common/archiver"
@@ -130,17 +131,28 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
130131
{
131132
Name: "namespace not found",
132133
Configure: func(p *params) {
133-
p.Retention = nil
134134
p.GetNamespaceByIDError = &serviceerror.NamespaceNotFound{}
135+
// namespace not found means we should use default retention
135136
p.ExpectedDeleteTime = p.CloseTime.Add(24 * time.Hour)
136137
},
137138
},
139+
{
140+
Name: "get namespace internal error",
141+
Configure: func(p *params) {
142+
p.GetNamespaceByIDError = serviceerror.NewInternal("get namespace error")
143+
p.ExpectAddTask = false
144+
p.ExpectedErrorSubstrings = []string{
145+
"get namespace error",
146+
}
147+
},
148+
},
138149
{
139150
Name: "wrong task type",
140151
Configure: func(p *params) {
152+
version := p.Task.GetVersion()
141153
p.Task = &tasks.DeleteExecutionTask{
142154
WorkflowKey: p.WorkflowKey,
143-
Version: p.Version,
155+
Version: version,
144156
}
145157
p.ExpectArchive = false
146158
p.ExpectAddTask = false
@@ -182,8 +194,97 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
182194
{
183195
Name: "archiver error",
184196
Configure: func(p *params) {
185-
p.ArchiveError = errors.New("archive error")
186-
p.ExpectedErrorSubstrings = []string{"archive error"}
197+
p.ArchiveError = errors.New("archiver error")
198+
p.ExpectedErrorSubstrings = []string{"archiver error"}
199+
p.ExpectAddTask = false
200+
},
201+
},
202+
{
203+
Name: "get workflow close time error",
204+
Configure: func(p *params) {
205+
p.GetWorkflowCloseTimeError = errors.New("get workflow close time error")
206+
p.ExpectedErrorSubstrings = []string{"get workflow close time error"}
207+
p.ExpectArchive = false
208+
p.ExpectAddTask = false
209+
},
210+
},
211+
{
212+
Name: "get current branch token error",
213+
Configure: func(p *params) {
214+
p.GetCurrentBranchTokenError = errors.New("get current branch token error")
215+
p.ExpectedErrorSubstrings = []string{"get current branch token error"}
216+
p.ExpectArchive = false
217+
p.ExpectAddTask = false
218+
},
219+
},
220+
{
221+
Name: "load mutable state error",
222+
Configure: func(p *params) {
223+
p.LoadMutableStateError = errors.New("load mutable state error")
224+
p.ExpectedErrorSubstrings = []string{"load mutable state error"}
225+
p.ExpectArchive = false
226+
p.ExpectAddTask = false
227+
},
228+
},
229+
{
230+
Name: "get or create workflow execution error",
231+
Configure: func(p *params) {
232+
p.GetOrCreateWorkflowExecutionError = errors.New("get or create workflow execution error")
233+
p.ExpectedErrorSubstrings = []string{"get or create workflow execution error"}
234+
p.ExpectArchive = false
235+
p.ExpectAddTask = false
236+
},
237+
},
238+
{
239+
Name: "get last write version error before archiving",
240+
Configure: func(p *params) {
241+
p.GetLastWriteVersionBeforeArchivalError = errors.New("get last write version error")
242+
p.ExpectedErrorSubstrings = []string{"get last write version error"}
243+
p.ExpectArchive = false
244+
p.ExpectAddTask = false
245+
},
246+
},
247+
{
248+
Name: "get last write version error after archiving",
249+
Configure: func(p *params) {
250+
p.GetLastWriteVersionAfterArchivalError = errors.New("get last write version error")
251+
p.ExpectedErrorSubstrings = []string{"get last write version error"}
252+
p.ExpectArchive = true
253+
p.ExpectAddTask = false
254+
},
255+
},
256+
{
257+
Name: "mutable state version does not match task version",
258+
Configure: func(p *params) {
259+
p.LastWriteVersionBeforeArchival = 1
260+
p.Task.(*tasks.ArchiveExecutionTask).Version = 2
261+
p.ExpectedErrorSubstrings = []string{"version mismatch"}
262+
p.ExpectArchive = false
263+
p.ExpectAddTask = false
264+
},
265+
},
266+
{
267+
Name: "last write version changed during archival",
268+
Configure: func(p *params) {
269+
p.LastWriteVersionAfterArchival = p.LastWriteVersionBeforeArchival + 1
270+
p.ExpectedErrorSubstrings = []string{"version mismatch"}
271+
p.ExpectArchive = true
272+
p.ExpectAddTask = false
273+
},
274+
},
275+
{
276+
Name: "close visibility task complete",
277+
Configure: func(p *params) {
278+
p.CloseVisibilityTaskCompleted = true
279+
},
280+
},
281+
{
282+
Name: "get workflow execution from visibility error",
283+
Configure: func(p *params) {
284+
p.CloseVisibilityTaskCompleted = true
285+
p.GetWorkflowExecutionError = errors.New("get workflow execution error")
286+
p.ExpectedErrorSubstrings = []string{"get workflow execution error"}
287+
p.ExpectArchive = false
187288
p.ExpectAddTask = false
188289
},
189290
},
@@ -209,10 +310,11 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
209310
// delete time = close time + retention
210311
// delete time = 2 minutes + 1 hour = 1 hour 2 minutes
211312
p.ExpectedDeleteTime = time.Unix(0, 0).Add(time.Minute * 2).Add(time.Hour)
212-
p.Version = 52
313+
p.LastWriteVersionBeforeArchival = 1
314+
p.LastWriteVersionAfterArchival = 1
213315
p.Task = &tasks.ArchiveExecutionTask{
214316
WorkflowKey: p.WorkflowKey,
215-
Version: p.Version,
317+
Version: 1,
216318
}
217319
p.HistoryURI = "test://history/archival"
218320
p.VisibilityURI = "test://visibility/archival"
@@ -268,7 +370,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
268370
cluster.TestCurrentClusterName,
269371
},
270372
},
271-
52,
373+
123,
272374
)
273375
namespaceRegistry.EXPECT().GetNamespaceName(namespaceEntry.ID()).
274376
Return(namespaceEntry.Name(), nil).AnyTimes()
@@ -278,19 +380,36 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
278380
if p.MutableStateExists {
279381
mutableState := workflow.NewMockMutableState(p.Controller)
280382
mutableState.EXPECT().IsWorkflowExecutionRunning().Return(p.IsWorkflowExecutionRunning).AnyTimes()
281-
mutableState.EXPECT().GetCurrentVersion().Return(p.Version).AnyTimes()
383+
mutableState.EXPECT().GetCurrentVersion().Return(p.LastWriteVersionBeforeArchival).AnyTimes()
282384
mutableState.EXPECT().GetWorkflowKey().Return(p.WorkflowKey).AnyTimes()
283-
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(mutableState, nil).AnyTimes()
284-
mutableState.EXPECT().GetCurrentBranchToken().Return(branchToken, nil).AnyTimes()
385+
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(
386+
mutableState,
387+
p.LoadMutableStateError,
388+
).AnyTimes()
389+
mutableState.EXPECT().GetCurrentBranchToken().Return(
390+
branchToken,
391+
p.GetCurrentBranchTokenError,
392+
).AnyTimes()
285393
mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry).AnyTimes()
286394
mutableState.EXPECT().GetNextEventID().Return(int64(100)).AnyTimes()
287-
mutableState.EXPECT().GetLastWriteVersion().Return(int64(52), nil).AnyTimes()
288-
mutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&p.CloseTime, nil).AnyTimes()
395+
mutableState.EXPECT().GetLastWriteVersion().Return(
396+
p.LastWriteVersionBeforeArchival,
397+
p.GetLastWriteVersionBeforeArchivalError,
398+
).MaxTimes(1)
399+
mutableState.EXPECT().GetLastWriteVersion().Return(
400+
p.LastWriteVersionAfterArchival,
401+
p.GetLastWriteVersionAfterArchivalError,
402+
).MaxTimes(1)
403+
mutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(
404+
&p.CloseTime,
405+
p.GetWorkflowCloseTimeError,
406+
).AnyTimes()
289407
executionInfo := &persistence.WorkflowExecutionInfo{
290-
NamespaceId: tests.NamespaceID.String(),
291-
StartTime: &p.StartTime,
292-
ExecutionTime: &p.ExecutionTime,
293-
CloseTime: &p.CloseTime,
408+
NamespaceId: tests.NamespaceID.String(),
409+
StartTime: &p.StartTime,
410+
ExecutionTime: &p.ExecutionTime,
411+
CloseTime: &p.CloseTime,
412+
CloseVisibilityTaskCompleted: p.CloseVisibilityTaskCompleted,
294413
}
295414
mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
296415
executionState := &persistence.WorkflowExecutionState{
@@ -304,7 +423,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
304423
task := ts[0]
305424
assert.Equal(t, p.WorkflowKey, task.WorkflowKey)
306425
assert.Zero(t, task.TaskID)
307-
assert.Equal(t, p.Version, task.Version)
426+
assert.Equal(t, p.LastWriteVersionBeforeArchival, task.Version)
308427
assert.Equal(t, branchToken, task.BranchToken)
309428
assert.True(t, task.WorkflowDataAlreadyArchived)
310429
assert.Equal(t, p.ExpectedDeleteTime, task.VisibilityTimestamp)
@@ -324,10 +443,21 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
324443
})
325444
}
326445
} else {
327-
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(nil, nil).AnyTimes()
446+
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(
447+
nil,
448+
p.LoadMutableStateError,
449+
).AnyTimes()
328450
}
329-
workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
330-
Return(workflowContext, cache.ReleaseCacheFunc(func(err error) {}), nil).AnyTimes()
451+
workflowCache.EXPECT().GetOrCreateWorkflowExecution(
452+
gomock.Any(),
453+
gomock.Any(),
454+
gomock.Any(),
455+
gomock.Any(),
456+
).Return(
457+
workflowContext,
458+
cache.ReleaseCacheFunc(func(err error) {}),
459+
p.GetOrCreateWorkflowExecutionError,
460+
).AnyTimes()
331461

332462
archivalMetadata := carchiver.NewMockArchivalMetadata(p.Controller)
333463
historyConfig := carchiver.NewMockArchivalConfig(p.Controller)
@@ -352,6 +482,15 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
352482
}
353483

354484
visibilityManager := manager.NewMockVisibilityManager(p.Controller)
485+
if p.CloseVisibilityTaskCompleted {
486+
visibilityManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(
487+
&manager.GetWorkflowExecutionResponse{Execution: &workflowpb.WorkflowExecutionInfo{
488+
Memo: nil,
489+
SearchAttributes: nil,
490+
}},
491+
p.GetWorkflowExecutionError,
492+
)
493+
}
355494

356495
executor := NewArchivalQueueTaskExecutor(
357496
a,
@@ -399,28 +538,38 @@ type testCase struct {
399538

400539
// params represents the parameters for a test within TestArchivalQueueTaskExecutor
401540
type params struct {
402-
Controller *gomock.Controller
403-
IsWorkflowExecutionRunning bool
404-
Retention *time.Duration
405-
Task tasks.Task
406-
ExpectedDeleteTime time.Time
407-
ExpectedErrorSubstrings []string
408-
ExpectArchive bool
409-
ExpectAddTask bool
410-
ExpectedTargets []archival.Target
411-
HistoryConfig archivalConfig
412-
VisibilityConfig archivalConfig
413-
WorkflowKey definition.WorkflowKey
414-
StartTime time.Time
415-
ExecutionTime time.Time
416-
CloseTime time.Time
417-
Version int64
418-
GetNamespaceByIDError error
419-
HistoryURI string
420-
VisibilityURI string
421-
MetricsHandler *metrics.MockHandler
422-
MutableStateExists bool
423-
ArchiveError error
541+
Controller *gomock.Controller
542+
IsWorkflowExecutionRunning bool
543+
Retention *time.Duration
544+
Task tasks.Task
545+
ExpectedDeleteTime time.Time
546+
ExpectedErrorSubstrings []string
547+
ExpectArchive bool
548+
ExpectAddTask bool
549+
ExpectedTargets []archival.Target
550+
HistoryConfig archivalConfig
551+
VisibilityConfig archivalConfig
552+
WorkflowKey definition.WorkflowKey
553+
StartTime time.Time
554+
ExecutionTime time.Time
555+
CloseTime time.Time
556+
GetNamespaceByIDError error
557+
HistoryURI string
558+
VisibilityURI string
559+
MetricsHandler *metrics.MockHandler
560+
MutableStateExists bool
561+
ArchiveError error
562+
GetWorkflowCloseTimeError error
563+
GetCurrentBranchTokenError error
564+
CloseVisibilityTaskCompleted bool
565+
ExpectGetWorkflowExecution bool
566+
GetWorkflowExecutionError error
567+
LoadMutableStateError error
568+
GetOrCreateWorkflowExecutionError error
569+
LastWriteVersionBeforeArchival int64
570+
GetLastWriteVersionBeforeArchivalError error
571+
LastWriteVersionAfterArchival int64
572+
GetLastWriteVersionAfterArchivalError error
424573
}
425574

426575
// archivalConfig represents the user configuration of archival for the cluster and namespace

0 commit comments

Comments
 (0)