Skip to content

Commit 3cb9232

Browse files
authored
Clean namespace handover (#3692)
1 parent 676b744 commit 3cb9232

20 files changed

+290
-45
lines changed

common/metrics/metric_defs.go

+1
Original file line numberDiff line numberDiff line change
@@ -1430,6 +1430,7 @@ var (
14301430
TaskWorkflowBusyCounter = NewCounterDef("task_errors_workflow_busy")
14311431
TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter")
14321432
TaskLimitExceededCounter = NewCounterDef("task_errors_limit_exceeded_counter")
1433+
TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover")
14331434
TaskScheduleToStartLatency = NewTimerDef("task_schedule_to_start_latency")
14341435
TransferTaskMissingEventCounter = NewCounterDef("transfer_task_missing_event_counter")
14351436
TaskBatchCompleteCounter = NewCounterDef("task_batch_complete_counter")

common/rpc/interceptor/namespace_validator.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ package interceptor
2626

2727
import (
2828
"context"
29-
"fmt"
3029

3130
enumspb "go.temporal.io/api/enums/v1"
3231
"go.temporal.io/api/serviceerror"
@@ -49,9 +48,8 @@ type (
4948
)
5049

5150
var (
52-
ErrNamespaceNotSet = serviceerror.NewInvalidArgument("Namespace not set on request.")
51+
errNamespaceNotSet = serviceerror.NewInvalidArgument("Namespace not set on request.")
5352
errNamespaceTooLong = serviceerror.NewInvalidArgument("Namespace length exceeds limit.")
54-
errNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String()))
5553
errTaskTokenNotSet = serviceerror.NewInvalidArgument("Task token not set on request.")
5654
errTaskTokenNamespaceMismatch = serviceerror.NewInvalidArgument("Operation requested with a token from a different namespace.")
5755

@@ -170,20 +168,20 @@ func (ni *NamespaceValidatorInterceptor) extractNamespaceFromRequest(req interfa
170168
// Special case for DescribeNamespace API which should read namespace directly from database.
171169
// Therefore, it must bypass namespace registry and validator.
172170
if request.GetId() == "" && namespaceName.IsEmpty() {
173-
return nil, ErrNamespaceNotSet
171+
return nil, errNamespaceNotSet
174172
}
175173
return nil, nil
176174
case *workflowservice.RegisterNamespaceRequest:
177175
// Special case for RegisterNamespace API. `namespaceName` is name of namespace that about to be registered.
178176
// There is no namespace entry for it, therefore, it must bypass namespace registry and validator.
179177
if namespaceName.IsEmpty() {
180-
return nil, ErrNamespaceNotSet
178+
return nil, errNamespaceNotSet
181179
}
182180
return nil, nil
183181
default:
184182
// All other APIs.
185183
if namespaceName.IsEmpty() {
186-
return nil, ErrNamespaceNotSet
184+
return nil, errNamespaceNotSet
187185
}
188186
return ni.namespaceRegistry.GetNamespace(namespaceName)
189187
}
@@ -215,7 +213,7 @@ func (ni *NamespaceValidatorInterceptor) extractNamespaceFromTaskToken(req inter
215213
}
216214

217215
if namespaceID.IsEmpty() {
218-
return nil, ErrNamespaceNotSet
216+
return nil, errNamespaceNotSet
219217
}
220218
return ni.namespaceRegistry.GetNamespaceByID(namespaceID)
221219
}
@@ -266,5 +264,5 @@ func (ni *NamespaceValidatorInterceptor) checkReplicationState(namespaceEntry *n
266264
return nil
267265
}
268266

269-
return errNamespaceHandover
267+
return common.ErrNamespaceHandover
270268
}

common/rpc/interceptor/namespace_validator_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,14 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp
200200
{
201201
state: enumspb.NAMESPACE_STATE_REGISTERED,
202202
replicationState: enumspb.REPLICATION_STATE_HANDOVER,
203-
expectedErr: errNamespaceHandover,
203+
expectedErr: common.ErrNamespaceHandover,
204204
method: "/temporal/StartWorkflowExecution",
205205
req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"},
206206
},
207207
// DescribeNamespace
208208
{
209209
state: enumspb.NAMESPACE_STATE_UNSPECIFIED,
210-
expectedErr: ErrNamespaceNotSet,
210+
expectedErr: errNamespaceNotSet,
211211
method: "/temporal/DescribeNamespace",
212212
req: &workflowservice.DescribeNamespaceRequest{},
213213
},
@@ -232,7 +232,7 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp
232232
},
233233
{
234234
state: enumspb.NAMESPACE_STATE_UNSPECIFIED,
235-
expectedErr: ErrNamespaceNotSet,
235+
expectedErr: errNamespaceNotSet,
236236
method: "/temporal/RegisterNamespace",
237237
req: &workflowservice.RegisterNamespaceRequest{},
238238
},

common/util.go

+9
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,11 @@ var (
150150
ErrContextTimeoutNotSet = serviceerror.NewInvalidArgument("Context timeout is not set.")
151151
)
152152

153+
var (
154+
// ErrNamespaceHandover is error indicating namespace is in handover state and cannot process request.
155+
ErrNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String()))
156+
)
157+
153158
// AwaitWaitGroup calls Wait on the given wait
154159
// Returns true if the Wait() call succeeded before the timeout
155160
// Returns false if the Wait() did not return before the timeout
@@ -336,6 +341,10 @@ func IsServiceClientTransientError(err error) bool {
336341
}
337342

338343
func IsServiceHandlerRetryableError(err error) bool {
344+
if err.Error() == ErrNamespaceHandover.Error() {
345+
return false
346+
}
347+
339348
switch err.(type) {
340349
case *serviceerror.Internal,
341350
*serviceerror.Unavailable:

service/history/consts/const.go

+2
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ var (
9090
ErrWorkflowNotReady = serviceerror.NewWorkflowNotReady("Workflow state is not ready to handle the request.")
9191
// ErrWorkflowTaskNotScheduled is error indicating workflow task is not scheduled yet.
9292
ErrWorkflowTaskNotScheduled = serviceerror.NewWorkflowNotReady("Workflow task is not scheduled yet.")
93+
// ErrNamespaceHandover is error indicating namespace is in handover state and cannot process request.
94+
ErrNamespaceHandover = common.ErrNamespaceHandover
9395

9496
// FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy
9597
// for start workflow execution API

service/history/historyEngine.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,7 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() {
370370
}
371371

372372
if e.shard.GetClusterMetadata().IsGlobalNamespaceEnabled() {
373-
maxTaskID, _ := e.replicationAckMgr.GetMaxTaskInfo()
374-
e.shard.UpdateHandoverNamespaces(nextNamespaces, maxTaskID)
373+
e.shard.UpdateHandoverNamespaces(nextNamespaces)
375374
}
376375

377376
newNotificationVersion := nextNamespaces[len(nextNamespaces)-1].NotificationVersion() + 1

service/history/nDCTaskUtil.go

+13
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package history
2727
import (
2828
"context"
2929

30+
enumspb "go.temporal.io/api/enums/v1"
3031
"go.temporal.io/api/serviceerror"
3132

3233
enumsspb "go.temporal.io/server/api/enums/v1"
@@ -210,3 +211,15 @@ func getNamespaceTagByID(
210211

211212
return metrics.NamespaceTag(namespaceName.String())
212213
}
214+
215+
func getNamespaceTagAndReplicationStateByID(
216+
registry namespace.Registry,
217+
namespaceID string,
218+
) (metrics.Tag, enumspb.ReplicationState) {
219+
namespace, err := registry.GetNamespaceByID(namespace.ID(namespaceID))
220+
if err != nil {
221+
return metrics.NamespaceUnknownTag(), enumspb.REPLICATION_STATE_UNSPECIFIED
222+
}
223+
224+
return metrics.NamespaceTag(namespace.Name().String()), namespace.ReplicationState()
225+
}

service/history/queues/executable.go

+18-5
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,12 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
284284
return nil
285285
}
286286

287+
if err.Error() == consts.ErrNamespaceHandover.Error() {
288+
e.taggedMetricsHandler.Counter(metrics.TaskNamespaceHandoverCounter.GetMetricName()).Record(1)
289+
err = consts.ErrNamespaceHandover
290+
return err
291+
}
292+
287293
if _, ok := err.(*serviceerror.NamespaceNotActive); ok {
288294
// TODO remove this error check special case after multi-cursor is enabled by default,
289295
// since the new task life cycle will not give up until task processed / verified
@@ -325,7 +331,10 @@ func (e *executableImpl) IsRetryableError(err error) bool {
325331
// ErrTaskRetry means mutable state is not ready for standby task processing
326332
// there's no point for retrying the task immediately which will hold the worker corouinte
327333
// TODO: change ErrTaskRetry to a better name
328-
return err != consts.ErrTaskRetry && err != consts.ErrWorkflowBusy && err != consts.ErrDependencyTaskNotCompleted
334+
return err != consts.ErrTaskRetry &&
335+
err != consts.ErrWorkflowBusy &&
336+
err != consts.ErrDependencyTaskNotCompleted &&
337+
err != consts.ErrNamespaceHandover
329338
}
330339

331340
func (e *executableImpl) RetryPolicy() backoff.RetryPolicy {
@@ -449,7 +458,9 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool {
449458
return false
450459
}
451460

452-
return err != consts.ErrTaskRetry && err != consts.ErrDependencyTaskNotCompleted
461+
return err != consts.ErrTaskRetry &&
462+
err != consts.ErrDependencyTaskNotCompleted &&
463+
err != consts.ErrNamespaceHandover
453464
}
454465

455466
func (e *executableImpl) rescheduleTime(
@@ -459,12 +470,14 @@ func (e *executableImpl) rescheduleTime(
459470
// elapsedTime (the first parameter in ComputeNextDelay) is not relevant here
460471
// since reschedule policy has no expiration interval.
461472

462-
if err == consts.ErrTaskRetry {
473+
if err == consts.ErrTaskRetry || err == consts.ErrNamespaceHandover {
463474
// using a different reschedule policy to slow down retry
464-
// as the error means mutable state is not ready to handle the task,
475+
// as the error means mutable state or namespace is not ready to handle the task,
465476
// need to wait for replication.
466477
return e.timeSource.Now().Add(taskNotReadyReschedulePolicy.ComputeNextDelay(0, attempt))
467-
} else if err == consts.ErrDependencyTaskNotCompleted {
478+
}
479+
480+
if err == consts.ErrDependencyTaskNotCompleted {
468481
return e.timeSource.Now().Add(dependencyTaskNotCompletedReschedulePolicy.ComputeNextDelay(0, attempt))
469482
}
470483

service/history/shard/context.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ type (
104104

105105
GetNamespaceNotificationVersion() int64
106106
UpdateNamespaceNotificationVersion(namespaceNotificationVersion int64) error
107-
UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace, maxRepTaskID int64)
107+
UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace)
108108

109109
AppendHistoryEvents(ctx context.Context, request *persistence.AppendHistoryNodesRequest, namespaceID namespace.ID, execution commonpb.WorkflowExecution) (int, error)
110110

0 commit comments

Comments
 (0)