Skip to content

Commit d43e112

Browse files
authored
Fix fail of speculative workflow task (#3898)
1 parent 30264ca commit d43e112

File tree

3 files changed

+240
-29
lines changed

3 files changed

+240
-29
lines changed

service/history/historyEngine2_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1017,8 +1017,9 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWorkflow_Excee
10171017
})
10181018

10191019
s.Error(err)
1020-
s.Assert().Equal([]string{"the number of pending child workflow executions, 5, has reached the per-workflow" +
1021-
" limit of 5"}, s.errorMessages)
1020+
s.IsType(&serviceerror.InvalidArgument{}, err)
1021+
s.Len(s.errorMessages, 1)
1022+
s.Equal("the number of pending child workflow executions, 5, has reached the per-workflow limit of 5", s.errorMessages[0])
10221023
}
10231024

10241025
func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() {

service/history/workflow/workflow_task_state_machine.go

+24-3
Original file line numberDiff line numberDiff line change
@@ -504,8 +504,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent(
504504
workflowTask.Attempt,
505505
timestamp.TimeValue(workflowTask.ScheduledTime).UTC(),
506506
)
507+
workflowTask.ScheduledEventID = scheduledEvent.GetEventId()
507508
startedEvent := m.ms.hBuilder.AddWorkflowTaskStartedEvent(
508-
scheduledEvent.GetEventId(),
509+
workflowTask.ScheduledEventID,
509510
workflowTask.RequestID,
510511
request.GetIdentity(),
511512
timestamp.TimeValue(workflowTask.StartedTime),
@@ -539,9 +540,29 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent(
539540
forkEventVersion int64,
540541
) (*historypb.HistoryEvent, error) {
541542

543+
if workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
544+
// Create corresponding WorkflowTaskScheduled and WorkflowTaskStarted events for speculative WT.
545+
scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
546+
m.ms.TaskQueue(),
547+
workflowTask.WorkflowTaskTimeout,
548+
workflowTask.Attempt,
549+
timestamp.TimeValue(workflowTask.ScheduledTime).UTC(),
550+
)
551+
workflowTask.ScheduledEventID = scheduledEvent.GetEventId()
552+
startedEvent := m.ms.hBuilder.AddWorkflowTaskStartedEvent(
553+
workflowTask.ScheduledEventID,
554+
workflowTask.RequestID,
555+
identity,
556+
timestamp.TimeValue(workflowTask.StartedTime),
557+
)
558+
// TODO (alex-update): Do we need to call next line here same as in AddWorkflowTaskCompletedEvent?
559+
m.ms.hBuilder.FlushAndCreateNewBatch()
560+
workflowTask.StartedEventID = startedEvent.GetEventId()
561+
}
562+
542563
var event *historypb.HistoryEvent
543-
// Only emit WorkflowTaskFailedEvent if workflow task is not transient and not speculative.
544-
if !m.ms.IsTransientWorkflowTask() && workflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
564+
// Only emit WorkflowTaskFailedEvent if workflow task is not transient.
565+
if !m.ms.IsTransientWorkflowTask() {
545566
event = m.ms.hBuilder.AddWorkflowTaskFailedEvent(
546567
workflowTask.ScheduledEventID,
547568
workflowTask.StartedEventID,

tests/update_workflow_test.go

+213-24
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
package tests
2626

2727
import (
28+
"errors"
2829
"strconv"
2930
"testing"
3031
"time"
@@ -40,6 +41,7 @@ import (
4041
updatepb "go.temporal.io/api/update/v1"
4142
"go.temporal.io/api/workflowservice/v1"
4243

44+
"go.temporal.io/server/common/debug"
4345
"go.temporal.io/server/common/payloads"
4446
"go.temporal.io/server/common/primitives/timestamp"
4547
)
@@ -455,12 +457,11 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_Reject() {
455457
taskQueue := &taskqueuepb.TaskQueue{Name: tq}
456458

457459
request := &workflowservice.StartWorkflowExecutionRequest{
458-
RequestId: uuid.New(),
459-
Namespace: s.namespace,
460-
WorkflowId: id,
461-
WorkflowType: workflowType,
462-
TaskQueue: taskQueue,
463-
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
460+
RequestId: uuid.New(),
461+
Namespace: s.namespace,
462+
WorkflowId: id,
463+
WorkflowType: workflowType,
464+
TaskQueue: taskQueue,
464465
}
465466

466467
startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
@@ -608,12 +609,11 @@ func (s *integrationSuite) TestUpdateWorkflow_NewWorkflowTask_Reject() {
608609
taskQueue := &taskqueuepb.TaskQueue{Name: tq}
609610

610611
request := &workflowservice.StartWorkflowExecutionRequest{
611-
RequestId: uuid.New(),
612-
Namespace: s.namespace,
613-
WorkflowId: id,
614-
WorkflowType: workflowType,
615-
TaskQueue: taskQueue,
616-
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
612+
RequestId: uuid.New(),
613+
Namespace: s.namespace,
614+
WorkflowId: id,
615+
WorkflowType: workflowType,
616+
TaskQueue: taskQueue,
617617
}
618618

619619
startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
@@ -781,12 +781,11 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_1stAccept_2ndAcc
781781
taskQueue := &taskqueuepb.TaskQueue{Name: tq}
782782

783783
request := &workflowservice.StartWorkflowExecutionRequest{
784-
RequestId: uuid.New(),
785-
Namespace: s.namespace,
786-
WorkflowId: id,
787-
WorkflowType: workflowType,
788-
TaskQueue: taskQueue,
789-
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
784+
RequestId: uuid.New(),
785+
Namespace: s.namespace,
786+
WorkflowId: id,
787+
WorkflowType: workflowType,
788+
TaskQueue: taskQueue,
790789
}
791790

792791
startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
@@ -1072,12 +1071,11 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_1stAccept_2ndRej
10721071
taskQueue := &taskqueuepb.TaskQueue{Name: tq}
10731072

10741073
request := &workflowservice.StartWorkflowExecutionRequest{
1075-
RequestId: uuid.New(),
1076-
Namespace: s.namespace,
1077-
WorkflowId: id,
1078-
WorkflowType: workflowType,
1079-
TaskQueue: taskQueue,
1080-
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
1074+
RequestId: uuid.New(),
1075+
Namespace: s.namespace,
1076+
WorkflowId: id,
1077+
WorkflowType: workflowType,
1078+
TaskQueue: taskQueue,
10811079
}
10821080

10831081
startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
@@ -1329,3 +1327,194 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_1stAccept_2ndRej
13291327
16 WorkflowTaskCompleted
13301328
17 WorkflowExecutionCompleted`, events)
13311329
}
1330+
1331+
func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_BadAcceptMessage() {
1332+
id := "integration-update-workflow-test-7"
1333+
wt := "integration-update-workflow-test-7-type"
1334+
tq := "integration-update-workflow-test-7-task-queue"
1335+
1336+
workflowType := &commonpb.WorkflowType{Name: wt}
1337+
taskQueue := &taskqueuepb.TaskQueue{Name: tq}
1338+
1339+
request := &workflowservice.StartWorkflowExecutionRequest{
1340+
RequestId: uuid.New(),
1341+
Namespace: s.namespace,
1342+
WorkflowId: id,
1343+
WorkflowType: workflowType,
1344+
TaskQueue: taskQueue,
1345+
// Some short but reasonable timeout because there is a wait for it in the test.
1346+
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second * debug.TimeoutMultiplier),
1347+
}
1348+
1349+
startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
1350+
s.NoError(err)
1351+
1352+
we := &commonpb.WorkflowExecution{
1353+
WorkflowId: id,
1354+
RunId: startResp.GetRunId(),
1355+
}
1356+
1357+
wtHandlerCalls := 0
1358+
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
1359+
wtHandlerCalls++
1360+
switch wtHandlerCalls {
1361+
case 1:
1362+
return []*commandpb.Command{{
1363+
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
1364+
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
1365+
ActivityId: strconv.Itoa(1),
1366+
ActivityType: &commonpb.ActivityType{Name: "activity_type_1"},
1367+
TaskQueue: &taskqueuepb.TaskQueue{Name: tq},
1368+
ScheduleToCloseTimeout: timestamp.DurationPtr(10 * time.Hour),
1369+
}},
1370+
}}, nil
1371+
case 2:
1372+
s.EqualHistory(`
1373+
1 WorkflowExecutionStarted
1374+
2 WorkflowTaskScheduled
1375+
3 WorkflowTaskStarted
1376+
4 WorkflowTaskCompleted
1377+
5 ActivityTaskScheduled
1378+
6 WorkflowTaskScheduled
1379+
7 WorkflowTaskStarted`, history)
1380+
return nil, nil
1381+
case 3:
1382+
s.Fail("should not be called because messageHandler returns error")
1383+
return nil, nil
1384+
case 4:
1385+
s.EqualHistory(`
1386+
1 WorkflowExecutionStarted
1387+
2 WorkflowTaskScheduled
1388+
3 WorkflowTaskStarted
1389+
4 WorkflowTaskCompleted
1390+
5 ActivityTaskScheduled
1391+
6 WorkflowTaskScheduled
1392+
7 WorkflowTaskStarted
1393+
8 WorkflowTaskFailed
1394+
9 WorkflowTaskScheduled
1395+
10 WorkflowTaskStarted`, history)
1396+
return []*commandpb.Command{{
1397+
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
1398+
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
1399+
Result: payloads.EncodeString("done"),
1400+
}},
1401+
}}, nil
1402+
default:
1403+
s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls)
1404+
return nil, nil
1405+
}
1406+
}
1407+
1408+
msgHandlerCalls := 0
1409+
msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) {
1410+
msgHandlerCalls++
1411+
switch msgHandlerCalls {
1412+
case 1:
1413+
return nil, nil
1414+
case 2:
1415+
updRequestMsg := task.Messages[0]
1416+
s.EqualValues(6, updRequestMsg.GetEventId())
1417+
1418+
// Emulate bug in worker/SDK update handler code. Return malformed acceptance response.
1419+
return []*protocolpb.Message{
1420+
{
1421+
Id: uuid.New(),
1422+
ProtocolInstanceId: "some-random-wrong-id",
1423+
SequencingId: nil,
1424+
Body: marshalAny(s, &updatepb.Acceptance{
1425+
AcceptedRequestMessageId: updRequestMsg.GetId(),
1426+
AcceptedRequestSequencingEventId: updRequestMsg.GetEventId(),
1427+
AcceptedRequest: nil, // must not be nil.
1428+
}),
1429+
},
1430+
}, nil
1431+
case 3:
1432+
// 2nd attempt doesn't have any updates attached to it.
1433+
s.Empty(task.Messages)
1434+
wtHandlerCalls++ // because it won't be called for case 3 but counter should be in sync.
1435+
// Fail WT one more time. Although 2nd attempt is normal WT, it is also transient and shouldn't appear in the history.
1436+
// Returning error will cause the poller to fail WT.
1437+
return nil, errors.New("malformed request")
1438+
case 4:
1439+
return nil, nil
1440+
default:
1441+
s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls)
1442+
return nil, nil
1443+
}
1444+
}
1445+
1446+
poller := &TaskPoller{
1447+
Engine: s.engine,
1448+
Namespace: s.namespace,
1449+
TaskQueue: taskQueue,
1450+
WorkflowTaskHandler: wtHandler,
1451+
MessageHandler: msgHandler,
1452+
Logger: s.Logger,
1453+
T: s.T(),
1454+
}
1455+
1456+
// Start activity using existing workflow task.
1457+
_, err = poller.PollAndProcessWorkflowTask(true, false)
1458+
s.NoError(err)
1459+
1460+
type UpdateResult struct {
1461+
Response *workflowservice.UpdateWorkflowExecutionResponse
1462+
Err error
1463+
}
1464+
updateResultCh := make(chan UpdateResult)
1465+
updateWorkflowFn := func() {
1466+
updateResponse, err1 := s.engine.UpdateWorkflowExecution(NewContext(), &workflowservice.UpdateWorkflowExecutionRequest{
1467+
Namespace: s.namespace,
1468+
WorkflowExecution: we,
1469+
Request: &updatepb.Request{
1470+
Meta: &updatepb.Meta{UpdateId: uuid.New()},
1471+
Input: &updatepb.Input{
1472+
Name: "update_handler",
1473+
Args: payloads.EncodeString("update args"),
1474+
},
1475+
},
1476+
})
1477+
s.NoError(err1)
1478+
updateResultCh <- UpdateResult{Response: updateResponse, Err: err1}
1479+
}
1480+
go updateWorkflowFn()
1481+
time.Sleep(500 * time.Millisecond) // This is to make sure that update gets to the server.
1482+
1483+
// Try to accept update in workflow: get malformed response.
1484+
_, err = poller.PollAndProcessWorkflowTask(false, false)
1485+
s.Error(err)
1486+
s.Equal(err.Error(), "BadUpdateWorkflowExecutionMessage: accepted_request is not set on update.Acceptance message body.")
1487+
// New normal (but transient) WT will be created but not returned.
1488+
1489+
updateResult := <-updateResultCh
1490+
s.NoError(updateResult.Err)
1491+
// TODO (alex-update): this is wrong. Caller shouldn't get this error if WT failed.
1492+
s.Equal("update cleared, please retry", updateResult.Response.GetOutcome().GetFailure().GetMessage())
1493+
1494+
// Try to accept update in workflow 2nd time: get error. Poller will fail WT.
1495+
_, err = poller.PollAndProcessWorkflowTask(false, false)
1496+
// The error is from RespondWorkflowTaskFailed, which should go w/o error.
1497+
s.NoError(err)
1498+
1499+
// Complete workflow.
1500+
_, err = poller.PollAndProcessWorkflowTask(false, false)
1501+
s.NoError(err)
1502+
1503+
s.Equal(4, wtHandlerCalls)
1504+
s.Equal(4, msgHandlerCalls)
1505+
1506+
events := s.getHistory(s.namespace, we)
1507+
s.EqualHistoryEvents(`
1508+
1 WorkflowExecutionStarted
1509+
2 WorkflowTaskScheduled
1510+
3 WorkflowTaskStarted
1511+
4 WorkflowTaskCompleted
1512+
5 ActivityTaskScheduled
1513+
6 WorkflowTaskScheduled
1514+
7 WorkflowTaskStarted
1515+
8 WorkflowTaskFailed
1516+
9 WorkflowTaskScheduled
1517+
10 WorkflowTaskStarted
1518+
11 WorkflowTaskCompleted
1519+
12 WorkflowExecutionCompleted`, events)
1520+
}

0 commit comments

Comments
 (0)