-
Notifications
You must be signed in to change notification settings - Fork 917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Synchronous workflow update #3822
Synchronous workflow update #3822
Conversation
579b2cb
to
cf7bc20
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broadly speaking looks good. I think we can make the Update type more powerful to consolidate a few things but that can be TODO. Couple of questions as well.
} else { | ||
return serviceerror.NewInvalidArgument(fmt.Sprintf("unknown message type: %v", message.GetBody().GetTypeUrl())) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this function admits the possibility of Accept followed by Reject and vice-versa. Not necessary for this PR but we should probably separate message validation semantics from protocol validation semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method only validates sequencing and it requires update.Response
(i.e. complete) message goes last. I am not sure I got your point.
service/history/historyEngine.go
Outdated
} | ||
|
||
req := request.GetRequest() | ||
updateRegistry := ms.GetUpdateRegistry() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we go toward a more general ProtocolRegistry eventually or are we going to stick with protocol-type-specific registries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know yet. Currently, update registry acts more like "update callers registry" and it looks more natural to have it per user facing API (i.e. UpdateWorkflowExecution
). I think we need at least one more protocol implementation to make a call.
} | ||
|
||
func (u *Update) MessageID() string { | ||
return u.messageID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems strange ... is this the messageID of the initial request message? Might just need a better function name here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also gone.
return startedEvent, workflowTask, err | ||
} | ||
func (m *workflowTaskStateMachine) skipWorkflowTaskCompleted(workflowTaskType enumsspb.WorkflowTaskType, request *workflowservice.RespondWorkflowTaskCompletedRequest) bool { | ||
if workflowTaskType != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE || len(request.GetCommands()) != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, this function actually gets easier if we end up going down the ProtocolCommand route.
} | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO but later - we definitely want to do this dispatch within a protocol state machine. On the sdk-side we find the protocol object by protocol_instance_id and then just call HandleMessage.
var newWorkflowTaskScheduledEventID int64 | ||
if createNewWorkflowTask { | ||
// TODO (alex-update): Need to support case when ReturnNewWorkflowTask=false and WT.Type=Speculative. | ||
// In this case WT needs to be added directly to matching. | ||
// Current implementation will create normal WT. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand now - good comment.
14e9758
to
c60edb5
Compare
4a0c1b7
to
bf053fa
Compare
#3848 should fix linter warnings. |
7b32add
to
bd8d3ef
Compare
@@ -3647,6 +3649,35 @@ func (wh *WorkflowHandler) UpdateWorkflowExecution( | |||
return nil, errRequestNotSet | |||
} | |||
|
|||
if err := validateExecution(request.GetWorkflowExecution()); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a parsed type here instead: https://lexi-lambda.github.io/blog/2019/11/05/parse-don-t-validate/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Examples in Haskell is not the best thing to convince people to follow your ideas :-) But I got the point. It is worth considering this approach project wide.
return nil, consts.ErrWorkflowExecutionNotFound | ||
} | ||
|
||
upd, removeFn := ms.UpdateRegistry().Add(req.GetRequest().GetRequest()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there anything we can do to prevent this stuttering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are all different "requests" and I hate this stuttering but:
historyUpdateRequest.GetFrontendUpdateRequest().GetUpdateRequest()
would be even worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bit of tech debt added here, but most of it's tagged, and I think the importance of this change outweighs the cost
There is even more tech debt in my head, and I admit, I contributed to it. I left bunch of TODOs for myself and I promise to address them (and those from my head too). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some other points we synced offline
- Reapply updates (future work)
- Replication layer should not return not Implemented for new event types
- Check workflow completion when applying messages
Anything else? I remember there maybe 1-2 more?
// Always bypass task generation for speculative WT. | ||
if workflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
speculative WT doesn't have (start to close) timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like I need special in-memory timer to handle this timeout. I made a note and will do it in separate PR.
updResponse.Size(), | ||
"Message body of type update.Response exceeds size limit.", | ||
); err != nil { | ||
return handler.failWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_UPDATE_WORKFLOW_EXECUTION_MESSAGE, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this add a workflow task failed event to the history for speculative WT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this seems to be broken and will lead to WTFailed event w/o preceding WTScheduled/WTStarted events. I will address this in separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was addressed here. Current behaviour: first speculative WT failure is not written to the history, but new WTis recreated as "normal", and got all corresponding events in the history, and if it keeps failing, it will become transient. I need to think if this should be changed in future.
bd8d3ef
to
a53977d
Compare
@@ -92,6 +92,8 @@ var ( | |||
ErrWorkflowTaskNotScheduled = serviceerror.NewWorkflowNotReady("Workflow task is not scheduled yet.") | |||
// ErrNamespaceHandover is error indicating namespace is in handover state and cannot process request. | |||
ErrNamespaceHandover = common.ErrNamespaceHandover | |||
// ErrWorkflowTaskStateInconsistent is error indicating workflow task state is inconsistent, for example there was no workflow task scheduled but buffered events are present. | |||
ErrWorkflowTaskStateInconsistent = serviceerror.NewUnavailable("Workflow task state is inconsistent.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internal error maybe? I don't think retry can help in that inconsistent state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think retry could help. WT can complete, buffered events might get flushed and maybe something else. Would leave it as Unavailable
for now.
return nil, serviceerror.NewUnimplemented("Workflow Update rebuild not implemented") | ||
// TODO (alex-update): Async workflow update might require update to be restored in registry from Accepted event. | ||
// Completed event will remove it from registry and notify update result pollers. | ||
return nil, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plz add a comment here says no change is needed on mutable state and no task needs to be generated for those events.
Is it possible that update related events are in different batches (e.g. one batch contains accepted and completed is in another batch)? If so, we need to handle the case where only some batches are replicated and then failover happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated comment, and will think about event batches later.
a53977d
to
238136a
Compare
238136a
to
558a42d
Compare
What changed?
First version synchronous workflow update implemented with messages.
Corresponding API changes: temporalio/api#253.
Messages protocol implementation added #3843.
Why?
New feature "Synchronous workflow update" which allows to synchronously update running workflow.
How did you test it?
New functional tests.
Potential risks
No risks.
Is hotfix candidate?
No.