Skip to content

Commit b40a4ac

Browse files
authored
Fix activity eager execution on workflow close (#3868)
1 parent 3501f67 commit b40a4ac

File tree

2 files changed

+89
-0
lines changed

2 files changed

+89
-0
lines changed

service/history/historyEngine_test.go

+82
Original file line numberDiff line numberDiff line change
@@ -1852,6 +1852,88 @@ func (s *engineSuite) TestRespondWorkflowTaskCompleted_ActivityEagerExecution_Ca
18521852
s.Equal(int64(3), resp.StartedResponse.PreviousStartedEventId)
18531853
}
18541854

1855+
func (s *engineSuite) TestRespondWorkflowTaskCompleted_ActivityEagerExecution_WorkflowClosed() {
1856+
namespaceID := tests.NamespaceID
1857+
we := commonpb.WorkflowExecution{
1858+
WorkflowId: tests.WorkflowID,
1859+
RunId: tests.RunID,
1860+
}
1861+
tl := "testTaskQueue"
1862+
tt := &tokenspb.Task{
1863+
Attempt: 1,
1864+
NamespaceId: namespaceID.String(),
1865+
WorkflowId: tests.WorkflowID,
1866+
RunId: we.GetRunId(),
1867+
ScheduledEventId: 2,
1868+
}
1869+
taskToken, _ := tt.Marshal()
1870+
identity := "testIdentity"
1871+
input := payloads.EncodeString("input")
1872+
1873+
ms := workflow.TestLocalMutableState(s.mockHistoryEngine.shard, s.eventsCache,
1874+
tests.LocalNamespaceEntry, log.NewTestLogger(), we.GetRunId())
1875+
addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 90*time.Second, 200*time.Second, identity)
1876+
wt := addWorkflowTaskScheduledEvent(ms)
1877+
addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity)
1878+
1879+
scheduleToCloseTimeout := timestamp.DurationPtr(90 * time.Second)
1880+
scheduleToStartTimeout := timestamp.DurationPtr(10 * time.Second)
1881+
startToCloseTimeout := timestamp.DurationPtr(50 * time.Second)
1882+
heartbeatTimeout := timestamp.DurationPtr(5 * time.Second)
1883+
commands := []*commandpb.Command{
1884+
{
1885+
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
1886+
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
1887+
ActivityId: "activity1",
1888+
ActivityType: &commonpb.ActivityType{Name: "activity_type1"},
1889+
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
1890+
Input: input,
1891+
ScheduleToCloseTimeout: scheduleToCloseTimeout,
1892+
ScheduleToStartTimeout: scheduleToStartTimeout,
1893+
StartToCloseTimeout: startToCloseTimeout,
1894+
HeartbeatTimeout: heartbeatTimeout,
1895+
RequestEagerExecution: true,
1896+
}},
1897+
},
1898+
{
1899+
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
1900+
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
1901+
Result: payloads.EncodeString("complete"),
1902+
}},
1903+
},
1904+
}
1905+
1906+
wfMs := workflow.TestCloneToProto(ms)
1907+
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs}
1908+
1909+
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)
1910+
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)
1911+
1912+
resp, err := s.mockHistoryEngine.RespondWorkflowTaskCompleted(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
1913+
NamespaceId: tests.NamespaceID.String(),
1914+
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
1915+
TaskToken: taskToken,
1916+
Commands: commands,
1917+
Identity: identity,
1918+
ReturnNewWorkflowTask: true,
1919+
},
1920+
})
1921+
s.NoError(err)
1922+
ms2 := s.getMutableState(tests.NamespaceID, we)
1923+
s.Equal(int64(7), ms2.GetNextEventID()) // activity scheduled, workflow completed
1924+
s.Equal(int64(3), ms2.GetExecutionInfo().LastWorkflowTaskStartedEventId)
1925+
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, ms2.GetExecutionState().State)
1926+
s.False(ms2.HasPendingWorkflowTask())
1927+
1928+
activityInfo, ok := ms2.GetActivityByActivityID("activity1")
1929+
s.True(ok)
1930+
s.Equal(int64(5), activityInfo.ScheduledEventId) // activity scheduled
1931+
s.Equal(common.EmptyEventID, activityInfo.StartedEventId) // activity not started
1932+
1933+
s.Len(resp.ActivityTasks, 0)
1934+
s.Nil(resp.StartedResponse)
1935+
}
1936+
18551937
func (s *engineSuite) TestRespondWorkflowTaskCompleted_WorkflowTaskHeartbeatTimeout() {
18561938
namespaceID := tests.NamespaceID
18571939
we := commonpb.WorkflowExecution{

service/history/workflowTaskHandler.go

+7
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,13 @@ func (handler *workflowTaskHandlerImpl) handlePostCommandEagerExecuteActivity(
333333
_ context.Context,
334334
attr *commandpb.ScheduleActivityTaskCommandAttributes,
335335
) (workflowTaskResponseMutation, error) {
336+
if !handler.mutableState.IsWorkflowExecutionRunning() {
337+
// workflow closed in the same workflow task
338+
// this function is executed as a callback after all workflow commands
339+
// are handled, so need to check for workflow completion case.
340+
return nil, nil
341+
}
342+
336343
ai, ok := handler.mutableState.GetActivityByActivityID(attr.ActivityId)
337344
if !ok {
338345
// activity cancelled in the same worflow task

0 commit comments

Comments
 (0)