Skip to content

Commit 32a4686

Browse files
authored
Pass workflowTask object to completion methods of mutable state (#3897)
1 parent 52d1ba0 commit 32a4686

21 files changed

+208
-264
lines changed

service/history/historyEngine2_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ func (s *engine2Suite) TestRecordWorkflowTaskStartedIfTaskAlreadyCompleted() {
381381
tl := "testTaskQueue"
382382

383383
ms := s.createExecutionStartedState(workflowExecution, tl, identity, true)
384-
addWorkflowTaskCompletedEvent(ms, int64(2), int64(3), identity)
384+
addWorkflowTaskCompletedEvent(&s.Suite, ms, int64(2), int64(3), identity)
385385

386386
wfMs := workflow.TestCloneToProto(ms)
387387
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs}
@@ -555,7 +555,7 @@ func (s *engine2Suite) TestRecordActivityTaskStartedSuccess() {
555555
activityInput := payloads.EncodeString("input1")
556556

557557
ms := s.createExecutionStartedState(workflowExecution, tl, identity, true)
558-
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, int64(2), int64(3), identity)
558+
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, int64(2), int64(3), identity)
559559
scheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second)
560560

561561
ms1 := workflow.TestCloneToProto(ms)
@@ -1002,6 +1002,7 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWorkflow_Excee
10021002
GetMapper(tests.Namespace).
10031003
Return(&searchattribute.TestMapper{Namespace: tests.Namespace.String()}, nil).
10041004
AnyTimes()
1005+
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)
10051006

10061007
s.historyEngine.shard.GetConfig().NumPendingChildExecutionsLimit = func(namespace string) int {
10071008
return 5
@@ -1651,7 +1652,7 @@ func (s *engine2Suite) TestRecordChildExecutionCompleted() {
16511652
wt := addWorkflowTaskScheduledEvent(ms)
16521653
workflowTasksStartEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, "testTaskQueue", uuid.New())
16531654
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
1654-
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
1655+
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
16551656

16561657
initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(),
16571658
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE)
@@ -1831,7 +1832,7 @@ func (s *engine2Suite) TestVerifyChildExecutionCompletionRecorded_InitiatedEvent
18311832
wt := addWorkflowTaskScheduledEvent(ms)
18321833
workflowTasksStartEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskQueueName, uuid.New())
18331834
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
1834-
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
1835+
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
18351836
initiatedEvent, ci := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(),
18361837
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE)
18371838

service/history/historyEngine_test.go

+45-41
Large diffs are not rendered by default.

service/history/ndc/branch_manager_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,7 @@ func (s *branchMgrSuite) TestFlushBufferedEvents() {
244244
VersionHistories: versionHistories,
245245
}).AnyTimes()
246246
s.mockMutableState.EXPECT().AddWorkflowTaskFailedEvent(
247-
workflowTask.ScheduledEventID,
248-
workflowTask.StartedEventID,
247+
workflowTask,
249248
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND,
250249
nil,
251250
consts.IdentityHistoryService,

service/history/ndc/workflow.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,7 @@ func (r *WorkflowImpl) failWorkflowTask(
238238
}
239239

240240
if _, err := r.mutableState.AddWorkflowTaskFailedEvent(
241-
workflowTask.ScheduledEventID,
242-
workflowTask.StartedEventID,
241+
workflowTask,
243242
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND,
244243
nil,
245244
consts.IdentityHistoryService,

service/history/ndc/workflow_resetter.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -501,8 +501,7 @@ func (r *workflowResetterImpl) failWorkflowTask(
501501
}
502502

503503
_, err = resetMutableState.AddWorkflowTaskFailedEvent(
504-
workflowTask.ScheduledEventID,
505-
workflowTask.StartedEventID,
504+
workflowTask,
506505
enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW,
507506
failure.NewResetWorkflowFailure(resetReason, nil),
508507
consts.IdentityHistoryService,

service/history/ndc/workflow_resetter_test.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -392,8 +392,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskScheduled() {
392392
consts.IdentityHistoryService,
393393
).Return(&historypb.HistoryEvent{}, workflowTaskStart, nil)
394394
mutableState.EXPECT().AddWorkflowTaskFailedEvent(
395-
workflowTaskStart.ScheduledEventID,
396-
workflowTaskStart.StartedEventID,
395+
workflowTaskStart,
397396
enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW,
398397
failure.NewResetWorkflowFailure(resetReason, nil),
399398
consts.IdentityHistoryService,
@@ -433,8 +432,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskStarted() {
433432
}
434433
mutableState.EXPECT().GetPendingWorkflowTask().Return(workflowTask, true).AnyTimes()
435434
mutableState.EXPECT().AddWorkflowTaskFailedEvent(
436-
workflowTask.ScheduledEventID,
437-
workflowTask.StartedEventID,
435+
workflowTask,
438436
enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW,
439437
failure.NewResetWorkflowFailure(resetReason, nil),
440438
consts.IdentityHistoryService,
@@ -534,8 +532,7 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() {
534532
mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes()
535533
mutableState.EXPECT().GetInFlightWorkflowTask().Return(workflowTask, true)
536534
mutableState.EXPECT().AddWorkflowTaskFailedEvent(
537-
workflowTask.ScheduledEventID,
538-
workflowTask.StartedEventID,
535+
workflowTask,
539536
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND,
540537
nil,
541538
consts.IdentityHistoryService,

service/history/ndc/workflow_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,7 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
286286
}
287287
s.mockMutableState.EXPECT().GetInFlightWorkflowTask().Return(inFlightWorkflowTask, true)
288288
s.mockMutableState.EXPECT().AddWorkflowTaskFailedEvent(
289-
inFlightWorkflowTask.ScheduledEventID,
290-
inFlightWorkflowTask.StartedEventID,
289+
inFlightWorkflowTask,
291290
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND,
292291
nil,
293292
consts.IdentityHistoryService,

service/history/timerQueueActiveTaskExecutor.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTaskTimeoutTask(
332332
enumspb.TIMEOUT_TYPE_START_TO_CLOSE,
333333
)
334334
if _, err := mutableState.AddWorkflowTaskTimedOutEvent(
335-
workflowTask.ScheduledEventID,
336-
workflowTask.StartedEventID,
335+
workflowTask,
337336
); err != nil {
338337
return err
339338
}

service/history/timerQueueActiveTaskExecutor_test.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Fire() {
232232
wt := addWorkflowTaskScheduledEvent(mutableState)
233233
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
234234
wt.StartedEventID = event.GetEventId()
235-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
235+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
236236

237237
timerID := "timer"
238238
timerTimeout := 2 * time.Second
@@ -302,7 +302,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Noop() {
302302
wt := addWorkflowTaskScheduledEvent(mutableState)
303303
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
304304
wt.StartedEventID = event.GetEventId()
305-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
305+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
306306

307307
timerID := "timer"
308308
timerTimeout := 2 * time.Second
@@ -366,7 +366,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo
366366
wt := addWorkflowTaskScheduledEvent(mutableState)
367367
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
368368
wt.StartedEventID = event.GetEventId()
369-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
369+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
370370

371371
taskqueue := "taskqueue"
372372
activityID := "activity"
@@ -445,7 +445,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo
445445
wt := addWorkflowTaskScheduledEvent(mutableState)
446446
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
447447
wt.StartedEventID = event.GetEventId()
448-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
448+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
449449

450450
identity := "identity"
451451
taskqueue := "taskqueue"
@@ -526,7 +526,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli
526526
wt := addWorkflowTaskScheduledEvent(mutableState)
527527
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
528528
wt.StartedEventID = event.GetEventId()
529-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
529+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
530530

531531
identity := "identity"
532532
taskqueue := "taskqueue"
@@ -619,7 +619,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli
619619
wt := addWorkflowTaskScheduledEvent(mutableState)
620620
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
621621
wt.StartedEventID = event.GetEventId()
622-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
622+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
623623

624624
taskqueue := "taskqueue"
625625
activityID := "activity"
@@ -705,7 +705,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli
705705
wt := addWorkflowTaskScheduledEvent(mutableState)
706706
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
707707
wt.StartedEventID = event.GetEventId()
708-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
708+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
709709

710710
identity := "identity"
711711
taskqueue := "taskqueue"
@@ -794,7 +794,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_Heartbeat
794794
wt := addWorkflowTaskScheduledEvent(mutableState)
795795
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
796796
wt.StartedEventID = event.GetEventId()
797-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
797+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
798798

799799
identity := "identity"
800800
taskqueue := "taskqueue"
@@ -1033,7 +1033,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowBackoffTimer_Noop() {
10331033
wt := addWorkflowTaskScheduledEvent(mutableState)
10341034
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
10351035
wt.StartedEventID = event.GetEventId()
1036-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
1036+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
10371037

10381038
timerTask := &tasks.WorkflowBackoffTimerTask{
10391039
WorkflowKey: definition.NewWorkflowKey(
@@ -1084,7 +1084,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestActivityRetryTimer_Fire() {
10841084
wt := addWorkflowTaskScheduledEvent(mutableState)
10851085
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
10861086
wt.StartedEventID = event.GetEventId()
1087-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
1087+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
10881088

10891089
taskqueue := "taskqueue"
10901090
activityID := "activity"
@@ -1173,7 +1173,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestActivityRetryTimer_Noop() {
11731173
wt := addWorkflowTaskScheduledEvent(mutableState)
11741174
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
11751175
wt.StartedEventID = event.GetEventId()
1176-
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
1176+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
11771177

11781178
identity := "identity"
11791179
taskqueue := "taskqueue"
@@ -1250,7 +1250,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_Fire() {
12501250
wt := addWorkflowTaskScheduledEvent(mutableState)
12511251
startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
12521252
wt.StartedEventID = startEvent.GetEventId()
1253-
completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
1253+
completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
12541254

12551255
timerTask := &tasks.WorkflowTimeoutTask{
12561256
WorkflowKey: definition.NewWorkflowKey(
@@ -1310,7 +1310,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_Retry() {
13101310
wt := addWorkflowTaskScheduledEvent(mutableState)
13111311
startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
13121312
wt.StartedEventID = startEvent.GetEventId()
1313-
completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
1313+
completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
13141314

13151315
timerTask := &tasks.WorkflowTimeoutTask{
13161316
WorkflowKey: definition.NewWorkflowKey(
@@ -1367,7 +1367,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_Cron() {
13671367
wt := addWorkflowTaskScheduledEvent(mutableState)
13681368
startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
13691369
wt.StartedEventID = startEvent.GetEventId()
1370-
completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
1370+
completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
13711371

13721372
timerTask := &tasks.WorkflowTimeoutTask{
13731373
WorkflowKey: definition.NewWorkflowKey(
@@ -1423,7 +1423,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_WorkflowExpired(
14231423
wt := addWorkflowTaskScheduledEvent(mutableState)
14241424
startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
14251425
wt.StartedEventID = startEvent.GetEventId()
1426-
completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
1426+
completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
14271427

14281428
timerTask := &tasks.WorkflowTimeoutTask{
14291429
WorkflowKey: definition.NewWorkflowKey(

0 commit comments

Comments
 (0)