Skip to content

Commit 9e37fec

Browse files
authored
Refactor failWorkflowTask method (#3915)
1 parent 9186ca7 commit 9e37fec

File tree

3 files changed

+63
-50
lines changed

3 files changed

+63
-50
lines changed

service/history/historyEngine_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1586,7 +1586,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedBadBinary() {
15861586
})
15871587
s.Error(err)
15881588
s.IsType(&serviceerror.InvalidArgument{}, err)
1589-
s.Equal("BadBinary: binary test-bad-binary is already marked as bad deployment", err.Error())
1589+
s.Equal("BadBinary: binary test-bad-binary is marked as bad deployment", err.Error())
15901590

15911591
s.NotNil(updatedWorkflowMutation)
15921592
s.Equal(int64(5), updatedWorkflowMutation.NextEventID)

service/history/workflowTaskHandler.go

+55-48
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity(
332332
return nil, handler.failWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES, err)
333333
}
334334
if err := handler.sizeLimitChecker.checkIfNumPendingActivitiesExceedsLimit(); err != nil {
335-
return nil, handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED, err)
335+
return nil, handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED, err)
336336
}
337337

338338
enums.SetDefaultTaskQueueKind(&attr.GetTaskQueue().Kind)
@@ -349,10 +349,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity(
349349
eagerStartActivity,
350350
)
351351
if err != nil {
352-
if _, ok := err.(*serviceerror.InvalidArgument); ok {
353-
return nil, handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_SCHEDULE_ACTIVITY_DUPLICATE_ID, err)
354-
}
355-
return nil, err
352+
return nil, handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_SCHEDULE_ACTIVITY_DUPLICATE_ID, err)
356353
}
357354

358355
if !eagerStartActivity {
@@ -473,10 +470,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelActivity(
473470
handler.identity,
474471
)
475472
if err != nil {
476-
if _, ok := err.(*serviceerror.InvalidArgument); ok {
477-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_ACTIVITY_ATTRIBUTES, err)
478-
}
479-
return err
473+
return handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_ACTIVITY_ATTRIBUTES, err)
480474
}
481475
if ai != nil {
482476
// If ai is nil, the activity has already been canceled/completed/timedout. The cancel request
@@ -518,10 +512,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartTimer(
518512

519513
_, _, err := handler.mutableState.AddTimerStartedEvent(handler.workflowTaskCompletedID, attr)
520514
if err != nil {
521-
if _, ok := err.(*serviceerror.InvalidArgument); ok {
522-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_START_TIMER_DUPLICATE_ID, err)
523-
}
524-
return err
515+
return handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_START_TIMER_DUPLICATE_ID, err)
525516
}
526517
return nil
527518
}
@@ -534,11 +525,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandCompleteWorkflow(
534525
handler.metricsHandler.Counter(metrics.CommandTypeCompleteWorkflowCounter.GetMetricName()).Record(1)
535526

536527
if handler.hasBufferedEvents {
537-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
528+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
538529
}
539530

540531
if handler.hasPendingUpdates {
541-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
532+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
542533
}
543534

544535
if err := handler.validateCommandAttr(
@@ -596,11 +587,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandFailWorkflow(
596587
handler.metricsHandler.Counter(metrics.CommandTypeFailWorkflowCounter.GetMetricName()).Record(1)
597588

598589
if handler.hasBufferedEvents {
599-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
590+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
600591
}
601592

602593
if handler.hasPendingUpdates {
603-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
594+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
604595
}
605596

606597
if err := handler.validateCommandAttr(
@@ -685,10 +676,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandCancelTimer(
685676
attr,
686677
handler.identity)
687678
if err != nil {
688-
if _, ok := err.(*serviceerror.InvalidArgument); ok {
689-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CANCEL_TIMER_ATTRIBUTES, err)
690-
}
691-
return err
679+
return handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CANCEL_TIMER_ATTRIBUTES, err)
692680
}
693681

694682
// timer deletion is a success, we may have deleted a fired timer in
@@ -707,11 +695,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandCancelWorkflow(
707695
handler.metricsHandler.Counter(metrics.CommandTypeCancelWorkflowCounter.GetMetricName()).Record(1)
708696

709697
if handler.hasBufferedEvents {
710-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
698+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
711699
}
712700

713701
if handler.hasPendingUpdates {
714-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
702+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
715703
}
716704

717705
if err := handler.validateCommandAttr(
@@ -768,7 +756,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelExternalWorkfl
768756
return err
769757
}
770758
if err := handler.sizeLimitChecker.checkIfNumPendingCancelRequestsExceedsLimit(); err != nil {
771-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_REQUEST_CANCEL_LIMIT_EXCEEDED, err)
759+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_REQUEST_CANCEL_LIMIT_EXCEEDED, err)
772760
}
773761

774762
cancelRequestID := uuid.New()
@@ -814,11 +802,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandContinueAsNewWorkflow(
814802
handler.metricsHandler.Counter(metrics.CommandTypeContinueAsNewCounter.GetMetricName()).Record(1)
815803

816804
if handler.hasBufferedEvents {
817-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
805+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
818806
}
819807

820808
if handler.hasPendingUpdates {
821-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
809+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
822810
}
823811

824812
namespaceName := handler.mutableState.GetNamespaceEntry().Name()
@@ -991,7 +979,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartChildWorkflow(
991979

992980
// child workflow limit
993981
if err := handler.sizeLimitChecker.checkIfNumChildWorkflowsExceedsLimit(); err != nil {
994-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED, err)
982+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED, err)
995983
}
996984

997985
enabled := handler.config.EnableParentClosePolicy(parentNamespace.String())
@@ -1044,7 +1032,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandSignalExternalWorkflow(
10441032
return err
10451033
}
10461034
if err := handler.sizeLimitChecker.checkIfNumPendingSignalsExceedsLimit(); err != nil {
1047-
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED, err)
1035+
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED, err)
10481036
}
10491037

10501038
if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit(
@@ -1423,49 +1411,68 @@ func (handler *workflowTaskHandlerImpl) validateCommandAttr(
14231411
validationFn commandAttrValidationFn,
14241412
) error {
14251413

1426-
if failedCause, err := validationFn(); err != nil {
1427-
if _, ok := err.(*serviceerror.InvalidArgument); ok {
1428-
return handler.failCommand(failedCause, err)
1429-
}
1414+
return handler.failWorkflowTaskOnInvalidArgument(validationFn())
1415+
}
1416+
1417+
func (handler *workflowTaskHandlerImpl) failWorkflowTaskOnInvalidArgument(
1418+
wtFailedCause enumspb.WorkflowTaskFailedCause,
1419+
err error,
1420+
) error {
1421+
1422+
switch err.(type) {
1423+
case *serviceerror.InvalidArgument:
1424+
return handler.failWorkflowTask(wtFailedCause, err)
1425+
default:
14301426
return err
14311427
}
1432-
1433-
return nil
14341428
}
14351429

1436-
func (handler *workflowTaskHandlerImpl) failCommand(
1430+
func (handler *workflowTaskHandlerImpl) failWorkflowTask(
14371431
failedCause enumspb.WorkflowTaskFailedCause,
14381432
causeErr error,
14391433
) error {
1440-
handler.workflowTaskFailedCause = NewWorkflowTaskFailedCause(failedCause, causeErr)
1434+
1435+
handler.workflowTaskFailedCause = newWorkflowTaskFailedCause(
1436+
failedCause,
1437+
causeErr,
1438+
nil)
14411439
handler.stopProcessing = true
1440+
// NOTE: failWorkflowTask always return nil.
1441+
// It is important to clear returned error if WT needs to be failed to properly add WTFailed event.
1442+
// Handler will rely on stopProcessing flag and workflowTaskFailedCause field.
14421443
return nil
14431444
}
14441445

14451446
func (handler *workflowTaskHandlerImpl) failWorkflow(
14461447
failedCause enumspb.WorkflowTaskFailedCause,
14471448
causeErr error,
14481449
) error {
1449-
handler.workflowTaskFailedCause = &workflowTaskFailedCause{
1450-
failedCause: failedCause,
1451-
causeErr: causeErr,
1452-
workflowFailure: failure.NewServerFailure(causeErr.Error(), true),
1453-
}
1450+
1451+
handler.workflowTaskFailedCause = newWorkflowTaskFailedCause(
1452+
failedCause,
1453+
causeErr,
1454+
failure.NewServerFailure(causeErr.Error(), true))
14541455
handler.stopProcessing = true
1456+
// NOTE: failWorkflow always return nil.
1457+
// It is important to clear returned error if WT needs to be failed to properly add WTFailed and FailWorkflow events.
1458+
// Handler will rely on stopProcessing flag and workflowTaskFailedCause field.
14551459
return nil
14561460
}
14571461

1458-
func NewWorkflowTaskFailedCause(failedCause enumspb.WorkflowTaskFailedCause, causeErr error) *workflowTaskFailedCause {
1462+
func newWorkflowTaskFailedCause(failedCause enumspb.WorkflowTaskFailedCause, causeErr error, workflowFailure *failurepb.Failure) *workflowTaskFailedCause {
1463+
14591464
return &workflowTaskFailedCause{
1460-
failedCause: failedCause,
1461-
causeErr: causeErr,
1465+
failedCause: failedCause,
1466+
causeErr: causeErr,
1467+
workflowFailure: workflowFailure,
14621468
}
14631469
}
14641470

1465-
func (wtfc *workflowTaskFailedCause) Message() string {
1466-
if wtfc.causeErr == nil {
1467-
return wtfc.failedCause.String()
1471+
func (c *workflowTaskFailedCause) Message() string {
1472+
1473+
if c.causeErr == nil {
1474+
return c.failedCause.String()
14681475
}
14691476

1470-
return fmt.Sprintf("%v: %v", wtfc.failedCause, wtfc.causeErr.Error())
1477+
return fmt.Sprintf("%v: %v", c.failedCause, c.causeErr.Error())
14711478
}

service/history/workflowTaskHandlerCallbacks.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,13 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
453453
hasPendingUpdates := ms.UpdateRegistry().HasPending(request.GetMessages())
454454
hasBufferedEvents := ms.HasBufferedEvents()
455455
if err := namespaceEntry.VerifyBinaryChecksum(request.GetBinaryChecksum()); err != nil {
456-
wtFailedCause = NewWorkflowTaskFailedCause(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_BINARY, serviceerror.NewInvalidArgument(fmt.Sprintf("binary %v is already marked as bad deployment", request.GetBinaryChecksum())))
456+
wtFailedCause = newWorkflowTaskFailedCause(
457+
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_BINARY,
458+
serviceerror.NewInvalidArgument(
459+
fmt.Sprintf(
460+
"binary %v is marked as bad deployment",
461+
request.GetBinaryChecksum())),
462+
nil)
457463
} else {
458464
namespace := namespaceEntry.Name()
459465
workflowSizeChecker := newWorkflowSizeChecker(

0 commit comments

Comments
 (0)