Skip to content

Commit 661880f

Browse files
authored
Refactor transient workflow task state machine (#3740)
1 parent 2b761b4 commit 661880f

6 files changed

+157
-138
lines changed

service/history/ndc/branch_manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (r *BranchMgrImpl) flushBufferedEvents(
170170
// check whether there are buffered events, if so, flush it
171171
// NOTE: buffered events does not show in version history or next event id
172172
if !r.mutableState.HasBufferedEvents() {
173-
if r.mutableState.HasTransientWorkflowTask() {
173+
if r.mutableState.HasInFlightWorkflowTask() && r.mutableState.IsTransientWorkflowTask() {
174174
if err := r.mutableState.ClearTransientWorkflowTask(); err != nil {
175175
return nil, 0, err
176176
}

service/history/ndc/branch_manager_test.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ func (s *branchMgrSuite) TestClearTransientWorkflowTask() {
193193

194194
s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastWriteVersion, nil).AnyTimes()
195195
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
196-
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(true).AnyTimes()
196+
s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes()
197+
s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(true).AnyTimes()
197198
s.mockMutableState.EXPECT().ClearTransientWorkflowTask().Return(nil).AnyTimes()
198199

199200
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
@@ -285,7 +286,8 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchAppendable_NoMissingEve
285286

286287
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes()
287288
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
288-
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()
289+
s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes()
290+
s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes()
289291

290292
doContinue, index, err := s.nDCBranchMgr.prepareVersionHistory(
291293
context.Background(),
@@ -316,7 +318,8 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchAppendable_MissingEvent
316318
s.NoError(err)
317319

318320
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
319-
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()
321+
s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes()
322+
s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes()
320323
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
321324
NamespaceId: s.namespaceID,
322325
WorkflowId: s.workflowID,
@@ -357,7 +360,8 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchNotAppendable_NoMissing
357360
newBranchToken := []byte("some random new branch token")
358361

359362
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
360-
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()
363+
s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes()
364+
s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes()
361365
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
362366
NamespaceId: s.namespaceID,
363367
WorkflowId: s.workflowID,
@@ -416,7 +420,8 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchNotAppendable_MissingEv
416420
})
417421

418422
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
419-
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()
423+
s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes()
424+
s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes()
420425
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
421426
NamespaceId: s.namespaceID,
422427
WorkflowId: s.workflowID,

service/history/workflow/mutable_state.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ type (
182182
GetWorkflowType() *commonpb.WorkflowType
183183
GetWorkflowStateStatus() (enumsspb.WorkflowExecutionState, enumspb.WorkflowExecutionStatus)
184184
GetQueryRegistry() QueryRegistry
185-
HasTransientWorkflowTask() bool
185+
IsTransientWorkflowTask() bool
186186
ClearTransientWorkflowTask() error
187187
HasBufferedEvents() bool
188188
HasInFlightWorkflowTask() bool

service/history/workflow/mutable_state_impl.go

+12-16
Original file line numberDiff line numberDiff line change
@@ -607,15 +607,15 @@ func (ms *MutableStateImpl) IsStickyTaskQueueEnabled() bool {
607607
return ms.executionInfo.StickyTaskQueue != ""
608608
}
609609

610-
func (e *MutableStateImpl) TaskQueue() *taskqueuepb.TaskQueue {
611-
if e.IsStickyTaskQueueEnabled() {
610+
func (ms *MutableStateImpl) TaskQueue() *taskqueuepb.TaskQueue {
611+
if ms.IsStickyTaskQueueEnabled() {
612612
return &taskqueuepb.TaskQueue{
613-
Name: e.executionInfo.StickyTaskQueue,
613+
Name: ms.executionInfo.StickyTaskQueue,
614614
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
615615
}
616616
}
617617
return &taskqueuepb.TaskQueue{
618-
Name: e.executionInfo.TaskQueue,
618+
Name: ms.executionInfo.TaskQueue,
619619
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
620620
}
621621
}
@@ -1294,25 +1294,18 @@ func (ms *MutableStateImpl) GetInFlightWorkflowTask() (*WorkflowTaskInfo, bool)
12941294
return ms.workflowTaskManager.GetInFlightWorkflowTask()
12951295
}
12961296

1297-
func (ms *MutableStateImpl) HasTransientWorkflowTask() bool {
1298-
workflowTask, ok := ms.GetInFlightWorkflowTask()
1299-
if !ok {
1300-
return false
1301-
}
1302-
return workflowTask.ScheduledEventID >= ms.GetNextEventID()
1297+
func (ms *MutableStateImpl) IsTransientWorkflowTask() bool {
1298+
return ms.executionInfo.WorkflowTaskAttempt > 1
13031299
}
13041300

13051301
func (ms *MutableStateImpl) ClearTransientWorkflowTask() error {
1306-
workflowTask, ok := ms.GetInFlightWorkflowTask()
1307-
if !ok {
1302+
if !ms.HasInFlightWorkflowTask() {
13081303
return serviceerror.NewInternal("cannot clear transient workflow task when task is missing")
13091304
}
1310-
1311-
if workflowTask.ScheduledEventID < ms.GetNextEventID() {
1305+
if !ms.IsTransientWorkflowTask() {
13121306
return serviceerror.NewInternal("cannot clear transient workflow task when task is not transient")
13131307
}
1314-
// workflowTask.ScheduledEventID >= ms.GetNextEventID()
1315-
// this is transient workflow
1308+
// this is transient workflow task
13161309
if ms.HasBufferedEvents() {
13171310
return serviceerror.NewInternal("cannot clear transient workflow task when there are buffered events")
13181311
}
@@ -1791,6 +1784,9 @@ func (ms *MutableStateImpl) GetTransientWorkflowTaskInfo(
17911784
workflowTask *WorkflowTaskInfo,
17921785
identity string,
17931786
) *historyspb.TransientWorkflowTaskInfo {
1787+
if !ms.IsTransientWorkflowTask() {
1788+
return nil
1789+
}
17941790
return ms.workflowTaskManager.GetTransientWorkflowTaskInfo(workflowTask, identity)
17951791
}
17961792

service/history/workflow/mutable_state_mock.go

+14-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)