Skip to content

Commit c6a89c0

Browse files
authored
Workflow task state machine: minor renames (#3735)
1 parent 12e4162 commit c6a89c0

8 files changed

+74
-61
lines changed

service/history/configs/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ type Config struct {
209209
// Workflow task settings
210210
// DefaultWorkflowTaskTimeout the default workflow task timeout
211211
DefaultWorkflowTaskTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
212-
// WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any workflow tasks
212+
// WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any commands
213213
// So that workflow task will be scheduled to another worker(by clear stickyness)
214214
WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
215215
WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn

service/history/workflow/history_builder.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,9 @@ func (b *HistoryBuilder) AddWorkflowTaskScheduledEvent(
198198
taskQueue *taskqueuepb.TaskQueue,
199199
startToCloseTimeout *time.Duration,
200200
attempt int32,
201-
now time.Time,
201+
scheduleTime time.Time,
202202
) *historypb.HistoryEvent {
203-
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, now)
203+
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, scheduleTime)
204204
event.Attributes = &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{
205205
WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{
206206
TaskQueue: taskQueue,
@@ -216,9 +216,9 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent(
216216
scheduledEventID int64,
217217
requestID string,
218218
identity string,
219-
now time.Time,
219+
startTime time.Time,
220220
) *historypb.HistoryEvent {
221-
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, now)
221+
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, startTime)
222222
event.Attributes = &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{
223223
WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{
224224
ScheduledEventId: scheduledEventID,

service/history/workflow/mutable_state.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ type (
138138
CheckResettable() error
139139
CloneToProto() *persistencespb.WorkflowMutableState
140140
RetryActivity(ai *persistencespb.ActivityInfo, failure *failurepb.Failure) (enumspb.RetryState, error)
141-
CreateTransientWorkflowTask(workflowTask *WorkflowTaskInfo, identity string) *historyspb.TransientWorkflowTaskInfo
141+
GetTransientWorkflowTaskInfo(workflowTask *WorkflowTaskInfo, identity string) *historyspb.TransientWorkflowTaskInfo
142142
DeleteWorkflowTask()
143143
DeleteSignalRequested(requestID string)
144144
FlushBufferedEvents()
@@ -193,6 +193,7 @@ type (
193193
IsCurrentWorkflowGuaranteed() bool
194194
IsSignalRequested(requestID string) bool
195195
IsStickyTaskQueueEnabled() bool
196+
TaskQueue() *taskqueuepb.TaskQueue
196197
IsWorkflowExecutionRunning() bool
197198
IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool
198199
IsWorkflowPendingOnWorkflowTaskBackoff() bool

service/history/workflow/mutable_state_impl.go

+17-4
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,19 @@ func (ms *MutableStateImpl) IsStickyTaskQueueEnabled() bool {
607607
return ms.executionInfo.StickyTaskQueue != ""
608608
}
609609

610+
func (e *MutableStateImpl) TaskQueue() *taskqueuepb.TaskQueue {
611+
if e.IsStickyTaskQueueEnabled() {
612+
return &taskqueuepb.TaskQueue{
613+
Name: e.executionInfo.StickyTaskQueue,
614+
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
615+
}
616+
}
617+
return &taskqueuepb.TaskQueue{
618+
Name: e.executionInfo.TaskQueue,
619+
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
620+
}
621+
}
622+
610623
func (ms *MutableStateImpl) GetWorkflowType() *commonpb.WorkflowType {
611624
wType := &commonpb.WorkflowType{}
612625
wType.Name = ms.executionInfo.WorkflowTypeName
@@ -1304,7 +1317,7 @@ func (ms *MutableStateImpl) ClearTransientWorkflowTask() error {
13041317
return serviceerror.NewInternal("cannot clear transient workflow task when there are buffered events")
13051318
}
13061319
// no buffered event
1307-
resetWorkflowTaskInfo := &WorkflowTaskInfo{
1320+
emptyWorkflowTaskInfo := &WorkflowTaskInfo{
13081321
Version: common.EmptyVersion,
13091322
ScheduledEventID: common.EmptyEventID,
13101323
StartedEventID: common.EmptyEventID,
@@ -1317,7 +1330,7 @@ func (ms *MutableStateImpl) ClearTransientWorkflowTask() error {
13171330
TaskQueue: nil,
13181331
OriginalScheduledTime: timestamp.UnixOrZeroTimePtr(0),
13191332
}
1320-
ms.workflowTaskManager.UpdateWorkflowTask(resetWorkflowTaskInfo)
1333+
ms.workflowTaskManager.UpdateWorkflowTask(emptyWorkflowTaskInfo)
13211334
return nil
13221335
}
13231336

@@ -1774,11 +1787,11 @@ func (ms *MutableStateImpl) ReplicateWorkflowTaskStartedEvent(
17741787
return ms.workflowTaskManager.ReplicateWorkflowTaskStartedEvent(workflowTask, version, scheduledEventID, startedEventID, requestID, timestamp)
17751788
}
17761789

1777-
func (ms *MutableStateImpl) CreateTransientWorkflowTask(
1790+
func (ms *MutableStateImpl) GetTransientWorkflowTaskInfo(
17781791
workflowTask *WorkflowTaskInfo,
17791792
identity string,
17801793
) *historyspb.TransientWorkflowTaskInfo {
1781-
return ms.workflowTaskManager.CreateTransientWorkflowTaskEvents(workflowTask, identity)
1794+
return ms.workflowTaskManager.GetTransientWorkflowTaskInfo(workflowTask, identity)
17821795
}
17831796

17841797
// add BinaryCheckSum for the first workflowTaskCompletedID for auto-reset

service/history/workflow/mutable_state_mock.go

+28-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/workflow/workflow_task_state_machine.go

+18-35
Original file line numberDiff line numberDiff line change
@@ -250,16 +250,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
250250
return nil, m.ms.createInternalServerError(opTag)
251251
}
252252

253-
// Task queue and workflow task timeout should already be set from workflow execution started event
254-
taskQueue := &taskqueuepb.TaskQueue{}
255-
if m.ms.IsStickyTaskQueueEnabled() {
256-
taskQueue.Name = m.ms.executionInfo.StickyTaskQueue
257-
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
258-
} else {
259-
taskQueue.Name = m.ms.executionInfo.TaskQueue
260-
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_NORMAL
261-
}
262-
263253
// Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for transient
264254
// workflow task and will cause in timeout processing to not work for transient workflow tasks
265255
if m.ms.HasBufferedEvents() {
@@ -280,21 +270,27 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
280270
}
281271
}
282272

283-
var newWorkflowTaskEvent *historypb.HistoryEvent
284-
scheduledEventID := m.ms.GetNextEventID() // we will generate the schedule event later for repeatedly failing workflow tasks
285-
// Avoid creating new history events when workflow tasks are continuously failing
286-
scheduledTime := m.ms.timeSource.Now().UTC()
273+
scheduleTime := m.ms.timeSource.Now().UTC()
287274
attempt := m.ms.executionInfo.WorkflowTaskAttempt
275+
// TaskQueue should already be set from workflow execution started event.
276+
taskQueue := m.ms.TaskQueue()
277+
// DefaultWorkflowTaskTimeout should already be set from workflow execution started event.
288278
startToCloseTimeout := m.getStartToCloseTimeout(m.ms.executionInfo.DefaultWorkflowTaskTimeout, attempt)
279+
280+
var scheduledEvent *historypb.HistoryEvent
281+
var scheduledEventID int64
282+
289283
if attempt == 1 {
290-
newWorkflowTaskEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
284+
scheduledEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
291285
taskQueue,
292286
startToCloseTimeout,
293287
attempt,
294-
m.ms.timeSource.Now(),
288+
scheduleTime,
295289
)
296-
scheduledEventID = newWorkflowTaskEvent.GetEventId()
297-
scheduledTime = timestamp.TimeValue(newWorkflowTaskEvent.GetEventTime())
290+
scheduledEventID = scheduledEvent.GetEventId()
291+
} else {
292+
// WorkflowTaskScheduledEvent will be created later.
293+
scheduledEventID = m.ms.GetNextEventID()
298294
}
299295

300296
workflowTask, err := m.ReplicateWorkflowTaskScheduledEvent(
@@ -303,7 +299,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
303299
taskQueue,
304300
startToCloseTimeout,
305301
attempt,
306-
&scheduledTime,
302+
&scheduleTime,
307303
originalScheduledTimestamp,
308304
)
309305
if err != nil {
@@ -446,12 +442,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent(
446442
m.beforeAddWorkflowTaskCompletedEvent()
447443
if workflowTask.Attempt > 1 {
448444
// Create corresponding WorkflowTaskSchedule and WorkflowTaskStarted events for workflow tasks we have been retrying
449-
taskQueue := &taskqueuepb.TaskQueue{
450-
Name: m.ms.executionInfo.TaskQueue,
451-
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
452-
}
453445
scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
454-
taskQueue,
446+
m.ms.TaskQueue(),
455447
workflowTask.WorkflowTaskTimeout,
456448
workflowTask.Attempt,
457449
timestamp.TimeValue(workflowTask.ScheduledTime).UTC(),
@@ -684,7 +676,7 @@ func (m *workflowTaskStateMachine) GetWorkflowTaskInfo(
684676
return nil, false
685677
}
686678

687-
func (m *workflowTaskStateMachine) CreateTransientWorkflowTaskEvents(
679+
func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo(
688680
workflowTask *WorkflowTaskInfo,
689681
identity string,
690682
) *historyspb.TransientWorkflowTaskInfo {
@@ -735,15 +727,6 @@ func (m *workflowTaskStateMachine) CreateTransientWorkflowTaskEvents(
735727
}
736728

737729
func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *WorkflowTaskInfo {
738-
taskQueue := &taskqueuepb.TaskQueue{}
739-
if m.ms.IsStickyTaskQueueEnabled() {
740-
taskQueue.Name = m.ms.executionInfo.StickyTaskQueue
741-
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
742-
} else {
743-
taskQueue.Name = m.ms.executionInfo.TaskQueue
744-
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_NORMAL
745-
}
746-
747730
return &WorkflowTaskInfo{
748731
Version: m.ms.executionInfo.WorkflowTaskVersion,
749732
ScheduledEventID: m.ms.executionInfo.WorkflowTaskScheduledEventId,
@@ -753,7 +736,7 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *WorkflowTaskInfo {
753736
Attempt: m.ms.executionInfo.WorkflowTaskAttempt,
754737
StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime,
755738
ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime,
756-
TaskQueue: taskQueue,
739+
TaskQueue: m.ms.TaskQueue(),
757740
OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime,
758741
}
759742
}

service/history/workflowTaskHandler.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,15 @@ func newWorkflowTaskHandler(
117117
config *configs.Config,
118118
shard shard.Context,
119119
searchAttributesMapper searchattribute.Mapper,
120+
hasBufferedEvents bool,
120121
) *workflowTaskHandlerImpl {
121122

122123
return &workflowTaskHandlerImpl{
123124
identity: identity,
124125
workflowTaskCompletedID: workflowTaskCompletedID,
125126

126127
// internal state
127-
hasBufferedEvents: mutableState.HasBufferedEvents(),
128+
hasBufferedEvents: hasBufferedEvents,
128129
workflowTaskFailedCause: nil,
129130
activityNotStartedCancelled: false,
130131
newMutableState: nil,

service/history/workflowTaskHandlerCallbacks.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
481481
handler.config,
482482
handler.shard,
483483
handler.searchAttributesMapper,
484+
hasUnhandledEvents,
484485
)
485486

486487
if responseMutations, err = workflowTaskHandler.handleCommands(
@@ -726,7 +727,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) createRecordWorkflowTaskStarted
726727
response.ScheduledTime = workflowTask.ScheduledTime
727728
response.StartedTime = workflowTask.StartedTime
728729

729-
response.TransientWorkflowTask = ms.CreateTransientWorkflowTask(workflowTask, identity)
730+
response.TransientWorkflowTask = ms.GetTransientWorkflowTaskInfo(workflowTask, identity)
730731

731732
currentBranchToken, err := ms.GetCurrentBranchToken()
732733
if err != nil {

0 commit comments

Comments
 (0)