-
Notifications
You must be signed in to change notification settings - Fork 916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Messages protocol implementation #3843
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ import ( | |
commonpb "go.temporal.io/api/common/v1" | ||
enumspb "go.temporal.io/api/enums/v1" | ||
failurepb "go.temporal.io/api/failure/v1" | ||
protocolpb "go.temporal.io/api/protocol/v1" | ||
"go.temporal.io/api/serviceerror" | ||
"go.temporal.io/api/workflowservice/v1" | ||
|
||
|
@@ -239,6 +240,31 @@ func (handler *workflowTaskHandlerImpl) handleCommand(ctx context.Context, comma | |
} | ||
} | ||
|
||
func (handler *workflowTaskHandlerImpl) handleMessages( | ||
ctx context.Context, | ||
messages []*protocolpb.Message, | ||
) error { | ||
if err := handler.attrValidator.validateMessages( | ||
messages, | ||
); err != nil { | ||
return err | ||
} | ||
|
||
for _, message := range messages { | ||
err := handler.handleMessage(ctx, message) | ||
if err != nil || handler.stopProcessing { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (handler *workflowTaskHandlerImpl) handleMessage(_ context.Context, _ *protocolpb.Message) error { | ||
|
||
return nil | ||
Comment on lines
+264
to
+265
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will have real code in #3822. |
||
} | ||
|
||
func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity( | ||
_ context.Context, | ||
attr *commandpb.ScheduleActivityTaskCommandAttributes, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -386,9 +386,11 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( | |
metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope)) | ||
} | ||
|
||
workflowTaskHeartbeating := request.GetForceCreateNewWorkflowTask() && len(request.Commands) == 0 | ||
workflowTaskHeartbeating := request.GetForceCreateNewWorkflowTask() && len(request.Commands) == 0 && len(request.Messages) == 0 | ||
var workflowTaskHeartbeatTimeout bool | ||
var completedEvent *historypb.HistoryEvent | ||
var responseMutations []workflowTaskResponseMutation | ||
|
||
if workflowTaskHeartbeating { | ||
namespace := namespaceEntry.Name() | ||
timeout := handler.config.WorkflowTaskHeartbeatTimeout(namespace.String()) | ||
|
@@ -423,11 +425,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( | |
wtFailedCause *workflowTaskFailedCause | ||
activityNotStartedCancelled bool | ||
newMutableState workflow.MutableState | ||
|
||
hasUnhandledEvents bool | ||
responseMutations []workflowTaskResponseMutation | ||
) | ||
hasUnhandledEvents = ms.HasBufferedEvents() | ||
hasBufferedEvents := ms.HasBufferedEvents() | ||
|
||
if request.StickyAttributes == nil || request.StickyAttributes.WorkerTaskQueue == nil { | ||
handler.metricsHandler.Counter(metrics.CompleteWorkflowTaskWithStickyDisabledCounter.GetMetricName()).Record( | ||
|
@@ -481,7 +480,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( | |
handler.config, | ||
handler.shard, | ||
handler.searchAttributesMapper, | ||
hasUnhandledEvents, | ||
hasBufferedEvents, | ||
) | ||
|
||
if responseMutations, err = workflowTaskHandler.handleCommands( | ||
|
@@ -491,6 +490,13 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( | |
return nil, err | ||
} | ||
|
||
if err = workflowTaskHandler.handleMessages( | ||
ctx, | ||
request.Messages, | ||
); err != nil { | ||
return nil, err | ||
} | ||
|
||
// set the vars used by following logic | ||
// further refactor should also clean up the vars used below | ||
wtFailedCause = workflowTaskHandler.workflowTaskFailedCause | ||
|
@@ -501,7 +507,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( | |
|
||
newMutableState = workflowTaskHandler.newMutableState | ||
|
||
hasUnhandledEvents = workflowTaskHandler.hasBufferedEvents | ||
hasBufferedEvents = workflowTaskHandler.hasBufferedEvents | ||
} | ||
|
||
if wtFailedCause != nil { | ||
|
@@ -522,7 +528,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( | |
if err != nil { | ||
return nil, err | ||
} | ||
hasUnhandledEvents = true | ||
hasBufferedEvents = true | ||
newMutableState = nil | ||
|
||
if wtFailedCause.workflowFailure != nil { | ||
|
@@ -532,24 +538,37 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( | |
if _, err := ms.AddFailWorkflowEvent(nextEventBatchId, enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE, attributes, ""); err != nil { | ||
return nil, err | ||
} | ||
hasUnhandledEvents = false | ||
hasBufferedEvents = false | ||
} | ||
} | ||
|
||
createNewWorkflowTask := ms.IsWorkflowExecutionRunning() && (hasUnhandledEvents || request.GetForceCreateNewWorkflowTask() || activityNotStartedCancelled) | ||
newWorkflowTaskType := enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED | ||
if ms.IsWorkflowExecutionRunning() && (hasBufferedEvents || request.GetForceCreateNewWorkflowTask() || activityNotStartedCancelled) { | ||
newWorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL | ||
} | ||
createNewWorkflowTask := newWorkflowTaskType != enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED | ||
|
||
Comment on lines
+545
to
+550
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block looks little weird but there are more cases to come in #3822. |
||
var newWorkflowTaskScheduledEventID int64 | ||
if createNewWorkflowTask { | ||
// TODO (alex-update): Need to support case when ReturnNewWorkflowTask=false and WT.Type=Speculative. | ||
// In this case WT needs to be added directly to matching. | ||
// Current implementation will create normal WT. | ||
bypassTaskGeneration := request.GetReturnNewWorkflowTask() && wtFailedCause == nil | ||
if !bypassTaskGeneration { | ||
// If task generation can't be bypassed workflow task must be of Normal type because Speculative workflow task always skip task generation. | ||
newWorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL | ||
} | ||
|
||
var newWorkflowTask *workflow.WorkflowTaskInfo | ||
var err error | ||
if workflowTaskHeartbeating && !workflowTaskHeartbeatTimeout { | ||
newWorkflowTask, err = ms.AddWorkflowTaskScheduledEventAsHeartbeat( | ||
bypassTaskGeneration, | ||
currentWorkflowTask.OriginalScheduledTime, | ||
enumsspb.WORKFLOW_TASK_TYPE_NORMAL, | ||
enumsspb.WORKFLOW_TASK_TYPE_NORMAL, // Heartbeat workflow task is always of Normal type. | ||
) | ||
} else { | ||
newWorkflowTask, err = ms.AddWorkflowTaskScheduledEvent(bypassTaskGeneration, enumsspb.WORKFLOW_TASK_TYPE_NORMAL) | ||
newWorkflowTask, err = ms.AddWorkflowTaskScheduledEvent(bypassTaskGeneration, newWorkflowTaskType) | ||
} | ||
if err != nil { | ||
return nil, err | ||
|
@@ -661,7 +680,6 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( | |
} | ||
|
||
return resp, nil | ||
|
||
} | ||
|
||
func (handler *workflowTaskHandlerCallbacksImpl) verifyFirstWorkflowTaskScheduled( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will have real code in #3822.