Skip to content

Commit cbd73e0

Browse files
authored
Synchronous workflow update (#3822)
1 parent d9bac92 commit cbd73e0

27 files changed

+2773
-412
lines changed

api/historyservice/v1/request_response.pb.go

+217-176
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/enums/defaults.go

+6
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,9 @@ func SetDefaultResetReapplyType(f *enumspb.ResetReapplyType) {
6969
*f = enumspb.RESET_REAPPLY_TYPE_SIGNAL
7070
}
7171
}
72+
73+
func SetDefaultUpdateWorkflowExecutionLifecycleStage(f *enumspb.UpdateWorkflowExecutionLifecycleStage) {
74+
if *f == enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED {
75+
*f = enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
76+
}
77+
}

common/log/tag/values.go

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ var (
4242
WorkflowActionUpsertWorkflowSearchAttributes = workflowAction("add-workflow-upsert-search-attributes-event")
4343
WorkflowActionWorkflowPropertiesModified = workflowAction("add-workflow-properties-modified-event")
4444

45+
// workflow update
46+
WorkflowActionUpdateAccepted = workflowAction("add-workflow-update-accepted-event")
47+
WorkflowActionUpdateCompleted = workflowAction("add-workflow-update-completed-event")
48+
4549
// workflow task
4650
WorkflowActionWorkflowTaskScheduled = workflowAction("add-workflowtask-scheduled-event")
4751
WorkflowActionWorkflowTaskStarted = workflowAction("add-workflowtask-started-event")

common/metrics/metric_defs.go

+109-105
Large diffs are not rendered by default.

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ require (
4545
go.opentelemetry.io/otel/sdk v1.11.2
4646
go.opentelemetry.io/otel/sdk/metric v0.34.0
4747
go.temporal.io/api v1.15.1-0.20230130221739-35f91d43296f
48-
go.temporal.io/sdk v1.20.1-0.20230125015921-1fe6824cedfe
48+
go.temporal.io/sdk v1.20.1-0.20230131233224-093eabe1f8d1
4949
go.temporal.io/version v0.3.0
5050
go.uber.org/atomic v1.10.0
5151
go.uber.org/fx v1.18.2

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -843,8 +843,8 @@ go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI
843843
go.temporal.io/api v1.15.1-0.20230125004443-42737e40d339/go.mod h1:dbi2T2T/PVw0jOHF0pdc8XFDFGQoMQeNq7F0ClTKwlU=
844844
go.temporal.io/api v1.15.1-0.20230130221739-35f91d43296f h1:wNX+g/F7lQWpzftw/Su3T9a1guXB04zqaX0lnHmPQLI=
845845
go.temporal.io/api v1.15.1-0.20230130221739-35f91d43296f/go.mod h1:u3qLbaVTffmcZQbf9ueB+16LKmhkftH79SJOV517MDk=
846-
go.temporal.io/sdk v1.20.1-0.20230125015921-1fe6824cedfe h1:mlyDvuKMuxCTcEgmeZs+Qd/x69jyT/Jwq1ULhJGfPbU=
847-
go.temporal.io/sdk v1.20.1-0.20230125015921-1fe6824cedfe/go.mod h1:KtPAB/8+lVNm0aP0W5wLCyxXfHoQJbYNfM4+SfVnsJc=
846+
go.temporal.io/sdk v1.20.1-0.20230131233224-093eabe1f8d1 h1:gj+p52mx2cq1UCOfHvxK2GtEZO6/qAhltKGLKbJe+qY=
847+
go.temporal.io/sdk v1.20.1-0.20230131233224-093eabe1f8d1/go.mod h1:KtPAB/8+lVNm0aP0W5wLCyxXfHoQJbYNfM4+SfVnsJc=
848848
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
849849
go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
850850
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

+1
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ message RespondWorkflowTaskCompletedRequest {
198198
message RespondWorkflowTaskCompletedResponse {
199199
RecordWorkflowTaskStartedResponse started_response = 1;
200200
repeated temporal.api.workflowservice.v1.PollActivityTaskQueueResponse activity_tasks = 2;
201+
int64 reset_history_event_id = 3;
201202
}
202203

203204
message RespondWorkflowTaskFailedRequest {

service/frontend/errors.go

+5
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ var (
7979
errReasonNotSet = serviceerror.NewInvalidArgument("Reason is not set on request.")
8080
errBatchOperationNotSet = serviceerror.NewInvalidArgument("Batch operation is not set on request.")
8181

82+
errUpdateMetaNotSet = serviceerror.NewInvalidArgument("Update meta is not set on request.")
83+
errUpdateInputNotSet = serviceerror.NewInvalidArgument("Update input is not set on request.")
84+
errUpdateNameNotSet = serviceerror.NewInvalidArgument("Update name is not set on request.")
85+
errUpdateIDTooLong = serviceerror.NewInvalidArgument("UpdateId length exceeds limit.")
86+
8287
errPageSizeTooBigMessage = "PageSize is larger than allowed %d."
8388

8489
errSearchAttributeIsReservedMessage = "Search attribute %s is reserved by system."

service/frontend/workflow_handler.go

+32-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
schedpb "go.temporal.io/api/schedule/v1"
4545
"go.temporal.io/api/serviceerror"
4646
taskqueuepb "go.temporal.io/api/taskqueue/v1"
47+
updatepb "go.temporal.io/api/update/v1"
4748
workflowpb "go.temporal.io/api/workflow/v1"
4849
"go.temporal.io/api/workflowservice/v1"
4950
"google.golang.org/grpc/health"
@@ -959,7 +960,8 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(
959960
}
960961

961962
completedResp := &workflowservice.RespondWorkflowTaskCompletedResponse{
962-
ActivityTasks: histResp.ActivityTasks,
963+
ActivityTasks: histResp.ActivityTasks,
964+
ResetHistoryEventId: histResp.ResetHistoryEventId,
963965
}
964966
if request.GetReturnNewWorkflowTask() && histResp != nil && histResp.StartedResponse != nil {
965967
taskToken := &tokenspb.Task{
@@ -3647,6 +3649,35 @@ func (wh *WorkflowHandler) UpdateWorkflowExecution(
36473649
return nil, errRequestNotSet
36483650
}
36493651

3652+
if err := validateExecution(request.GetWorkflowExecution()); err != nil {
3653+
return nil, err
3654+
}
3655+
3656+
if request.GetRequest().GetMeta() == nil {
3657+
return nil, errUpdateMetaNotSet
3658+
}
3659+
3660+
if len(request.GetRequest().GetMeta().GetUpdateId()) > wh.config.MaxIDLengthLimit() {
3661+
return nil, errUpdateIDTooLong
3662+
}
3663+
3664+
if request.GetRequest().GetMeta().GetUpdateId() == "" {
3665+
request.GetRequest().GetMeta().UpdateId = uuid.New()
3666+
}
3667+
3668+
if request.GetRequest().GetInput() == nil {
3669+
return nil, errUpdateInputNotSet
3670+
}
3671+
3672+
if request.GetRequest().GetInput().GetName() == "" {
3673+
return nil, errUpdateNameNotSet
3674+
}
3675+
3676+
if request.GetWaitPolicy() == nil {
3677+
request.WaitPolicy = &updatepb.WaitPolicy{}
3678+
}
3679+
enums.SetDefaultUpdateWorkflowExecutionLifecycleStage(&request.GetWaitPolicy().LifecycleStage)
3680+
36503681
nsID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
36513682
if err != nil {
36523683
return nil, err
+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package updateworkflow
26+
27+
import (
28+
"context"
29+
"time"
30+
31+
commonpb "go.temporal.io/api/common/v1"
32+
updatepb "go.temporal.io/api/update/v1"
33+
"go.temporal.io/api/workflowservice/v1"
34+
35+
enumsspb "go.temporal.io/server/api/enums/v1"
36+
"go.temporal.io/server/api/historyservice/v1"
37+
"go.temporal.io/server/api/matchingservice/v1"
38+
"go.temporal.io/server/common/definition"
39+
"go.temporal.io/server/common/namespace"
40+
"go.temporal.io/server/service/history/api"
41+
"go.temporal.io/server/service/history/consts"
42+
"go.temporal.io/server/service/history/shard"
43+
"go.temporal.io/server/service/history/workflow"
44+
)
45+
46+
func Invoke(
47+
ctx context.Context,
48+
req *historyservice.UpdateWorkflowExecutionRequest,
49+
shardCtx shard.Context,
50+
workflowConsistencyChecker api.WorkflowConsistencyChecker,
51+
matchingClient matchingservice.MatchingServiceClient,
52+
) (_ *historyservice.UpdateWorkflowExecutionResponse, retErr error) {
53+
54+
weCtx, err := workflowConsistencyChecker.GetWorkflowContext(
55+
ctx,
56+
nil,
57+
api.BypassMutableStateConsistencyPredicate,
58+
definition.NewWorkflowKey(
59+
req.NamespaceId,
60+
req.Request.WorkflowExecution.WorkflowId,
61+
req.Request.WorkflowExecution.RunId,
62+
),
63+
)
64+
if err != nil {
65+
return nil, err
66+
}
67+
defer func() { weCtx.GetReleaseFn()(retErr) }()
68+
69+
ms := weCtx.GetMutableState()
70+
if !ms.IsWorkflowExecutionRunning() {
71+
return nil, consts.ErrWorkflowCompleted
72+
}
73+
74+
if req.GetRequest().GetFirstExecutionRunId() != "" && ms.GetExecutionInfo().GetFirstExecutionRunId() != req.GetRequest().GetFirstExecutionRunId() {
75+
return nil, consts.ErrWorkflowExecutionNotFound
76+
}
77+
78+
upd, duplicate, removeFn := ms.UpdateRegistry().Add(req.GetRequest().GetRequest())
79+
if removeFn != nil {
80+
defer removeFn()
81+
}
82+
83+
// If WT is scheduled, but not started, updates will be attached to it, when WT is started.
84+
// If WT has already started, new speculative WT will be created when started WT completes.
85+
// If update is duplicate, then WT for this update was already created.
86+
createNewWorkflowTask := !ms.HasPendingWorkflowTask() && duplicate == false
87+
88+
if createNewWorkflowTask {
89+
// This will try not to add an event but will create speculative WT in mutable state.
90+
// Task generation will be skipped if WT is created as speculative.
91+
wt, err := ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE)
92+
if err != nil {
93+
return nil, err
94+
}
95+
if wt.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
96+
// This should never happen because WT is created as normal (despite speculative is requested)
97+
// only if there were buffered events and because there were no pending WT, there can't be buffered events.
98+
return nil, consts.ErrWorkflowTaskStateInconsistent
99+
}
100+
101+
// It is important to release workflow lock before calling matching.
102+
weCtx.GetReleaseFn()(nil)
103+
err = addWorkflowTaskToMatching(ctx, shardCtx, ms, matchingClient, wt, namespace.ID(req.GetNamespaceId()))
104+
if err != nil {
105+
return nil, err
106+
}
107+
} else {
108+
weCtx.GetReleaseFn()(nil)
109+
}
110+
111+
updOutcome, err := upd.WaitOutcome(ctx)
112+
if err != nil {
113+
return nil, err
114+
}
115+
resp := &historyservice.UpdateWorkflowExecutionResponse{
116+
Response: &workflowservice.UpdateWorkflowExecutionResponse{
117+
UpdateRef: &updatepb.UpdateRef{
118+
WorkflowExecution: &commonpb.WorkflowExecution{
119+
WorkflowId: weCtx.GetWorkflowKey().WorkflowID,
120+
RunId: weCtx.GetWorkflowKey().RunID,
121+
},
122+
UpdateId: req.GetRequest().GetRequest().GetMeta().GetUpdateId(),
123+
},
124+
Outcome: updOutcome,
125+
},
126+
}
127+
128+
return resp, nil
129+
}
130+
131+
// TODO (alex-update): Consider moving this func to a better place.
132+
func addWorkflowTaskToMatching(
133+
ctx context.Context,
134+
shardCtx shard.Context,
135+
ms workflow.MutableState,
136+
matchingClient matchingservice.MatchingServiceClient,
137+
task *workflow.WorkflowTaskInfo,
138+
nsID namespace.ID,
139+
) error {
140+
// TODO (alex-update): Timeout calculation is copied from somewhere else. Extract func instead?
141+
var taskScheduleToStartTimeout *time.Duration
142+
if ms.TaskQueue().GetName() != task.TaskQueue.GetName() {
143+
taskScheduleToStartTimeout = ms.GetExecutionInfo().StickyScheduleToStartTimeout
144+
} else {
145+
taskScheduleToStartTimeout = ms.GetExecutionInfo().WorkflowRunTimeout
146+
}
147+
148+
wfKey := ms.GetWorkflowKey()
149+
clock, err := shardCtx.NewVectorClock()
150+
if err != nil {
151+
return err
152+
}
153+
154+
_, err = matchingClient.AddWorkflowTask(ctx, &matchingservice.AddWorkflowTaskRequest{
155+
NamespaceId: nsID.String(),
156+
Execution: &commonpb.WorkflowExecution{
157+
WorkflowId: wfKey.WorkflowID,
158+
RunId: wfKey.RunID,
159+
},
160+
TaskQueue: task.TaskQueue,
161+
ScheduledEventId: task.ScheduledEventID,
162+
ScheduleToStartTimeout: taskScheduleToStartTimeout,
163+
Clock: clock,
164+
})
165+
if err != nil {
166+
return err
167+
}
168+
169+
return nil
170+
}

0 commit comments

Comments
 (0)