Skip to content

Commit 6ef7749

Browse files
authored
Eager workflow dispatch (#3835)
* Eager workflow dispatch * Revert returning workflowTaskInfo from NewWorkflowWithSignal * Fix lint issues * Minor restructuring * Support eager start with TERMINATE_IF_RUNNING * Properly release workflow context * Add documentation and restructure for better readability * Fix lint * Run go-generate * Fix nolint directive * More restructuring, get rid of cyclo complexity * Fix task inflight condition * Address review comments * Address review comments * Fix missing reason tag value * Add missing nil check * Address review comments
1 parent dbeab31 commit 6ef7749

23 files changed

+1075
-523
lines changed

api/historyservice/v1/request_response.pb.go

+437-368
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,11 @@ const (
8787
EnableParentClosePolicyWorker = "system.enableParentClosePolicyWorker"
8888
// EnableStickyQuery indicates if sticky query should be enabled per namespace
8989
EnableStickyQuery = "system.enableStickyQuery"
90-
// EnableActivityEagerExecution indicates if acitivty eager execution is enabled per namespace
90+
// EnableActivityEagerExecution indicates if activity eager execution is enabled per namespace
9191
EnableActivityEagerExecution = "system.enableActivityEagerExecution"
92+
// EnableEagerWorkflowStart toggles "eager workflow start" - returning the first workflow task inline in the
93+
// response to a StartWorkflowExecution request and skipping the trip through matching.
94+
EnableEagerWorkflowStart = "system.enableEagerWorkflowStart"
9295
// NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config
9396
NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval"
9497

common/metrics/metric_defs.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -1464,7 +1464,14 @@ var (
14641464
MessageTypeCompleteWorkflowExecutionUpdateCounter = NewCounterDef("complete_workflow_update_message")
14651465
MessageTypeRejectWorkflowExecutionUpdateCounter = NewCounterDef("reject_workflow_update_message")
14661466

1467-
ActivityEagerExecutionCounter = NewCounterDef("activity_eager_execution")
1467+
ActivityEagerExecutionCounter = NewCounterDef("activity_eager_execution")
1468+
// WorkflowEagerExecutionCounter is emitted any time eager workflow start is requested.
1469+
WorkflowEagerExecutionCounter = NewCounterDef("workflow_eager_execution")
1470+
// WorkflowEagerExecutionDeniedCounter is emitted any time eager workflow start is requested and the serer fell back
1471+
// to standard dispatch.
1472+
// Timeouts and failures are not counted in this metric.
1473+
// This metric has a "reason" tag attached to it to understand why eager start was denied.
1474+
WorkflowEagerExecutionDeniedCounter = NewCounterDef("workflow_eager_execution_denied")
14681475
EmptyCompletionCommandsCounter = NewCounterDef("empty_completion_commands")
14691476
MultipleCompletionCommandsCounter = NewCounterDef("multiple_completion_commands")
14701477
FailedWorkflowTasksCounter = NewCounterDef("failed_workflow_tasks")

common/metrics/tags.go

+12
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ const (
5151
commandType = "commandType"
5252
serviceName = "service_name"
5353
actionType = "action_type"
54+
// Generic reason tag can be used anywhere a reason is needed.
55+
reason = "reason"
5456

5557
namespaceAllValue = "all"
5658
unknownValue = "_unknown_"
@@ -278,3 +280,13 @@ func StringTag(key string, value string) Tag {
278280
func CacheTypeTag(value string) Tag {
279281
return &tagImpl{key: CacheTypeTagName, value: value}
280282
}
283+
284+
// ReasonString is just a string but the special type is defined here to remind callers of ReasonTag to limit the
285+
// cardinality of possible reasons.
286+
type ReasonString string
287+
288+
// ReasonTag is a generic tag can be used anywhere a reason is needed.
289+
// Make sure that the value is of limited cardinality.
290+
func ReasonTag(value ReasonString) Tag {
291+
return &tagImpl{key: reason, value: string(value)}
292+
}

config/dynamicconfig/development-cass.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,5 @@
3636
# constraints: {}
3737
#system.enableParentClosePolicyWorker:
3838
# - value: true
39+
system.enableEagerWorkflowStart:
40+
- value: true

config/dynamicconfig/development-sql.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
# constraints: {}
3737
#system.enableParentClosePolicyWorker:
3838
# - value: true
39+
system.enableEagerWorkflowStart:
40+
- value: true
3941
limit.maxIDLength:
4042
- value: 255
4143
constraints: {}

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

+2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ message StartWorkflowExecutionRequest {
6666
message StartWorkflowExecutionResponse {
6767
string run_id = 1;
6868
temporal.server.api.clock.v1.VectorClock clock = 2;
69+
// Set if request_eager_execution is set on the start request
70+
temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse eager_workflow_task = 3;
6971
}
7072

7173
message GetMutableStateRequest {

service/frontend/workflow_handler.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
407407
if err != nil {
408408
return nil, err
409409
}
410-
return &workflowservice.StartWorkflowExecutionResponse{RunId: resp.GetRunId()}, nil
410+
return &workflowservice.StartWorkflowExecutionResponse{RunId: resp.GetRunId(), EagerWorkflowTask: resp.GetEagerWorkflowTask()}, nil
411411
}
412412

413413
// GetWorkflowExecutionHistory returns the history of specified workflow execution. It fails with 'EntityNotExistError' if specified workflow
@@ -2866,6 +2866,7 @@ func (wh *WorkflowHandler) GetSystemInfo(ctx context.Context, request *workflows
28662866
SupportsSchedules: true,
28672867
EncodedFailureAttributes: true,
28682868
UpsertMemo: true,
2869+
EagerWorkflowStart: true,
28692870
},
28702871
}, nil
28712872
}

service/history/api/create_workflow_util.go

+23-12
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,32 @@ func NewWorkflowWithSignal(
9999
return nil, err
100100
}
101101
}
102-
102+
requestEagerExecution := startRequest.StartRequest.GetRequestEagerExecution()
103103
// Generate first workflow task event if not child WF and no first workflow task backoff
104-
if err := GenerateFirstWorkflowTask(
104+
scheduledEventID, err := GenerateFirstWorkflowTask(
105105
newMutableState,
106106
startRequest.ParentExecutionInfo,
107107
startEvent,
108-
); err != nil {
108+
requestEagerExecution,
109+
)
110+
if err != nil {
109111
return nil, err
110112
}
111113

114+
// If first workflow task should back off (e.g. cron or workflow retry) a workflow task will not be scheduled.
115+
if requestEagerExecution && newMutableState.HasPendingWorkflowTask() {
116+
_, _, err = newMutableState.AddWorkflowTaskStartedEvent(
117+
scheduledEventID,
118+
startRequest.StartRequest.RequestId,
119+
startRequest.StartRequest.TaskQueue,
120+
startRequest.StartRequest.Identity,
121+
)
122+
if err != nil {
123+
// Unable to add WorkflowTaskStarted event to history
124+
return nil, err
125+
}
126+
}
127+
112128
newWorkflowContext := workflow.NewContext(
113129
shard,
114130
definition.NewWorkflowKey(
@@ -146,17 +162,13 @@ func GenerateFirstWorkflowTask(
146162
mutableState workflow.MutableState,
147163
parentInfo *workflowspb.ParentExecutionInfo,
148164
startEvent *historypb.HistoryEvent,
149-
) error {
150-
165+
bypassTaskGeneration bool,
166+
) (int64, error) {
151167
if parentInfo == nil {
152168
// WorkflowTask is only created when it is not a Child Workflow and no backoff is needed
153-
if err := mutableState.AddFirstWorkflowTaskScheduled(
154-
startEvent,
155-
); err != nil {
156-
return err
157-
}
169+
return mutableState.AddFirstWorkflowTaskScheduled(startEvent, bypassTaskGeneration)
158170
}
159-
return nil
171+
return 0, nil
160172
}
161173

162174
func NewWorkflowVersionCheck(
@@ -272,7 +284,6 @@ func ValidateStartWorkflowExecutionRequest(
272284
if err := common.ValidateRetryPolicy(request.RetryPolicy); err != nil {
273285
return err
274286
}
275-
276287
if err := ValidateStart(
277288
ctx,
278289
shard,

service/history/api/signalwithstartworkflow/signal_with_start_workflow.go

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func startAndSignalWorkflow(
8686
) (string, error) {
8787
workflowID := signalWithStartRequest.GetWorkflowId()
8888
runID := uuid.New().String()
89+
// TODO(bergundy): Support eager workflow task
8990
newWorkflowContext, err := api.NewWorkflowWithSignal(
9091
ctx,
9192
shard,

0 commit comments

Comments
 (0)