Skip to content

Commit 0bb6884

Browse files
authored
Handle namespace not found case in redirection interceptor (#3947)
1 parent 868b2d1 commit 0bb6884

12 files changed

+92
-17
lines changed

common/rpc/interceptor/caller_info.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (i *CallerInfoInterceptor) Intercept(
5959

6060
updateInfo := false
6161
if callerInfo.CallerName == "" {
62-
callerInfo.CallerName = string(GetNamespace(i.namespaceRegistry, req))
62+
callerInfo.CallerName = string(MustGetNamespaceName(i.namespaceRegistry, req))
6363
updateInfo = callerInfo.CallerName != ""
6464
}
6565
if callerInfo.CallerType == "" {

common/rpc/interceptor/namespace.go

+23-6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
package interceptor
2626

2727
import (
28+
"fmt"
29+
30+
"go.temporal.io/api/serviceerror"
2831
"go.temporal.io/server/common/namespace"
2932
)
3033

@@ -40,28 +43,42 @@ type (
4043
}
4144
)
4245

43-
func GetNamespace(
46+
// MustGetNamespaceName returns request namespace name
47+
// or EmptyName if there's error when retriving namespace name,
48+
// e.g. unable to find namespace
49+
func MustGetNamespaceName(
4450
namespaceRegistry namespace.Registry,
4551
req interface{},
4652
) namespace.Name {
53+
namespaceName, err := GetNamespaceName(namespaceRegistry, req)
54+
if err != nil {
55+
return namespace.EmptyName
56+
}
57+
return namespaceName
58+
}
59+
60+
func GetNamespaceName(
61+
namespaceRegistry namespace.Registry,
62+
req interface{},
63+
) (namespace.Name, error) {
4764
switch request := req.(type) {
4865
case NamespaceNameGetter:
4966
namespaceName := namespace.Name(request.GetNamespace())
5067
_, err := namespaceRegistry.GetNamespace(namespaceName)
5168
if err != nil {
52-
return namespace.EmptyName
69+
return namespace.EmptyName, err
5370
}
54-
return namespaceName
71+
return namespaceName, nil
5572

5673
case NamespaceIDGetter:
5774
namespaceID := namespace.ID(request.GetNamespaceId())
5875
namespaceName, err := namespaceRegistry.GetNamespaceName(namespaceID)
5976
if err != nil {
60-
return namespace.EmptyName
77+
return namespace.EmptyName, err
6178
}
62-
return namespaceName
79+
return namespaceName, nil
6380

6481
default:
65-
return namespace.EmptyName
82+
return namespace.EmptyName, serviceerror.NewInternal(fmt.Sprintf("unable to extract namespace info from request: %+v", req))
6683
}
6784
}

common/rpc/interceptor/namespace_count_limit.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (ni *NamespaceCountLimitInterceptor) Intercept(
8282
// token will default to 0
8383
token := ni.tokens[methodName]
8484
if token != 0 {
85-
nsName := GetNamespace(ni.namespaceRegistry, req)
85+
nsName := MustGetNamespaceName(ni.namespaceRegistry, req)
8686
counter := ni.counter(nsName, methodName)
8787
count := atomic.AddInt32(counter, int32(token))
8888
defer atomic.AddInt32(counter, -int32(token))

common/rpc/interceptor/namespace_logger.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (nli *NamespaceLogInterceptor) Intercept(
6363

6464
if nli.logger != nil {
6565
_, methodName := SplitMethodName(info.FullMethod)
66-
namespace := GetNamespace(nli.namespaceRegistry, req)
66+
namespace := MustGetNamespaceName(nli.namespaceRegistry, req)
6767
tlsInfo := authorization.TLSInfoFormContext(ctx)
6868
var serverName string
6969
var certThumbprint string

common/rpc/interceptor/namespace_rate_limit.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (ni *NamespaceRateLimitInterceptor) Intercept(
7878
token = NamespaceRateLimitDefaultToken
7979
}
8080

81-
namespace := GetNamespace(ni.namespaceRegistry, req)
81+
namespace := MustGetNamespaceName(ni.namespaceRegistry, req)
8282
if !ni.rateLimiter.Allow(time.Now().UTC(), quotas.NewRequest(
8383
methodName,
8484
token,

common/rpc/interceptor/namespace_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (s *namespaceSuite) TestGetNamespace() {
190190
}
191191

192192
for _, testCase := range testCases {
193-
extractedNamespace := GetNamespace(register, testCase.method)
193+
extractedNamespace := MustGetNamespaceName(register, testCase.method)
194194
s.Equal(testCase.namespaceName, extractedNamespace)
195195
}
196196
}

common/rpc/interceptor/telemetry.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (ti *TelemetryInterceptor) metricsHandlerLogTags(
238238

239239
overridedMethodName := ti.overrideOperationTag(fullMethod, methodName, req)
240240

241-
nsName := GetNamespace(ti.namespaceRegistry, req)
241+
nsName := MustGetNamespaceName(ti.namespaceRegistry, req)
242242
if nsName == "" {
243243
return ti.metricsHandler.WithTags(metrics.OperationTag(overridedMethodName), metrics.NamespaceUnknownTag()),
244244
[]tag.Tag{tag.Operation(overridedMethodName)}

service/frontend/redirection_interceptor.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,10 @@ func (i *RedirectionInterceptor) Intercept(
177177
return i.handleLocalAPIInvocation(ctx, req, handler, methodName)
178178
}
179179
if raFn, ok := globalAPIResponses[methodName]; ok {
180-
namespaceName := interceptor.GetNamespace(i.namespaceCache, req)
180+
namespaceName, err := interceptor.GetNamespaceName(i.namespaceCache, req)
181+
if err != nil {
182+
return nil, err
183+
}
181184
return i.handleRedirectAPIInvocation(ctx, req, info, handler, methodName, raFn, namespaceName)
182185
}
183186

service/frontend/redirection_interceptor_test.go

+26-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/google/uuid"
3434
"github.com/stretchr/testify/require"
3535
"github.com/stretchr/testify/suite"
36+
"go.temporal.io/api/serviceerror"
3637
"go.temporal.io/api/workflowservice/v1"
3738
"google.golang.org/grpc"
3839

@@ -258,7 +259,7 @@ func (s *redirectionInterceptorSuite) TestHandleGlobalAPIInvocation_Local() {
258259
s.True(functionInvoked)
259260
}
260261

261-
func (s *redirectionInterceptorSuite) TestHandleLocalAPIInvocation_Redirect() {
262+
func (s *redirectionInterceptorSuite) TestHandleGlobalAPIInvocation_Redirect() {
262263
ctx := context.Background()
263264
req := &workflowservice.SignalWithStartWorkflowExecutionRequest{}
264265
info := &grpc.UnaryServerInfo{
@@ -300,6 +301,30 @@ func (s *redirectionInterceptorSuite) TestHandleLocalAPIInvocation_Redirect() {
300301
s.IsType(&workflowservice.SignalWithStartWorkflowExecutionResponse{}, resp)
301302
}
302303

304+
func (s *redirectionInterceptorSuite) TestHandleGlobalAPIInvocation_NamespaceNotFound() {
305+
ctx := context.Background()
306+
req := &workflowservice.PollWorkflowTaskQueueRequest{}
307+
info := &grpc.UnaryServerInfo{
308+
FullMethod: "/temporal.api.workflowservice.v1.WorkflowService/PollWorkflowTaskQueue",
309+
}
310+
311+
namespaceName := namespace.Name("unknown_namespace")
312+
s.namespaceCache.EXPECT().GetNamespace(namespaceName).Return(nil, &serviceerror.NamespaceNotFound{}).AnyTimes()
313+
methodName := "PollWorkflowTaskQueue"
314+
315+
resp, err := s.redirector.handleRedirectAPIInvocation(
316+
ctx,
317+
req,
318+
info,
319+
nil,
320+
methodName,
321+
globalAPIResponses[methodName],
322+
namespaceName,
323+
)
324+
s.Nil(resp)
325+
s.IsType(&serviceerror.NamespaceNotFound{}, err)
326+
}
327+
303328
type (
304329
mockClientConnInterface struct {
305330
*suite.Suite

tests/activity_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,10 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Success() {
134134
for i := 0; i < 10; i++ {
135135
s.Logger.Info("Heartbeating for activity", tag.WorkflowActivityID(activityID), tag.Counter(i))
136136
_, err := s.engine.RecordActivityTaskHeartbeat(NewContext(), &workflowservice.RecordActivityTaskHeartbeatRequest{
137-
TaskToken: taskToken, Details: payloads.EncodeString("details")})
137+
Namespace: s.namespace,
138+
TaskToken: taskToken,
139+
Details: payloads.EncodeString("details"),
140+
})
138141
s.NoError(err)
139142
time.Sleep(10 * time.Millisecond)
140143
}
@@ -685,7 +688,10 @@ func (s *integrationSuite) TestTryActivityCancellationFromWorkflow() {
685688
s.Logger.Info("Heartbeating for activity", tag.WorkflowActivityID(activityID), tag.Counter(i))
686689
response, err := s.engine.RecordActivityTaskHeartbeat(NewContext(),
687690
&workflowservice.RecordActivityTaskHeartbeatRequest{
688-
TaskToken: taskToken, Details: payloads.EncodeString("details")})
691+
Namespace: s.namespace,
692+
TaskToken: taskToken,
693+
Details: payloads.EncodeString("details"),
694+
})
689695
if response != nil && response.CancelRequested {
690696
activityCanceled = true
691697
return payloads.EncodeString("Activity Cancelled"), true, nil

tests/taskpoller.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,10 @@ Loop:
230230
if response.Query != nil {
231231
blob, err := p.QueryHandler(response)
232232

233-
completeRequest := &workflowservice.RespondQueryTaskCompletedRequest{TaskToken: response.TaskToken}
233+
completeRequest := &workflowservice.RespondQueryTaskCompletedRequest{
234+
Namespace: p.Namespace,
235+
TaskToken: response.TaskToken,
236+
}
234237
if err != nil {
235238
completeType := enumspb.QUERY_RESULT_TYPE_FAILED
236239
completeRequest.CompletedType = completeType
@@ -252,6 +255,7 @@ Loop:
252255
if err != nil {
253256
p.Logger.Error("Failing workflow task. Workflow messages handler failed with error", tag.Error(err))
254257
_, err = p.Engine.RespondWorkflowTaskFailed(NewContext(), &workflowservice.RespondWorkflowTaskFailedRequest{
258+
Namespace: p.Namespace,
255259
TaskToken: response.TaskToken,
256260
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE,
257261
Failure: newApplicationFailure(err, false, nil),
@@ -276,6 +280,7 @@ Loop:
276280
if err != nil {
277281
p.Logger.Error("Failing workflow task. Workflow task handler failed with error", tag.Error(err))
278282
_, err = p.Engine.RespondWorkflowTaskFailed(NewContext(), &workflowservice.RespondWorkflowTaskFailedRequest{
283+
Namespace: p.Namespace,
279284
TaskToken: response.TaskToken,
280285
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE,
281286
Failure: newApplicationFailure(err, false, nil),
@@ -293,6 +298,7 @@ Loop:
293298
if !respondStickyTaskQueue {
294299
// non sticky taskqueue
295300
newTask, err := p.Engine.RespondWorkflowTaskCompleted(NewContext(), &workflowservice.RespondWorkflowTaskCompletedRequest{
301+
Namespace: p.Namespace,
296302
TaskToken: response.TaskToken,
297303
Identity: p.Identity,
298304
Commands: commands,
@@ -307,6 +313,7 @@ Loop:
307313
newTask, err := p.Engine.RespondWorkflowTaskCompleted(
308314
NewContext(),
309315
&workflowservice.RespondWorkflowTaskCompletedRequest{
316+
Namespace: p.Namespace,
310317
TaskToken: response.TaskToken,
311318
Identity: p.Identity,
312319
Commands: commands,
@@ -354,6 +361,7 @@ func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWor
354361
if err != nil {
355362
p.Logger.Error("Failing workflow task. Workflow messages handler failed with error", tag.Error(err))
356363
_, err = p.Engine.RespondWorkflowTaskFailed(NewContext(), &workflowservice.RespondWorkflowTaskFailedRequest{
364+
Namespace: p.Namespace,
357365
TaskToken: response.TaskToken,
358366
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE,
359367
Failure: newApplicationFailure(err, false, nil),
@@ -368,6 +376,7 @@ func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWor
368376
if err != nil {
369377
p.Logger.Error("Failing workflow task. Workflow task handler failed with error", tag.Error(err))
370378
_, err = p.Engine.RespondWorkflowTaskFailed(NewContext(), &workflowservice.RespondWorkflowTaskFailedRequest{
379+
Namespace: p.Namespace,
371380
TaskToken: response.TaskToken,
372381
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE,
373382
Failure: newApplicationFailure(err, false, nil),
@@ -386,6 +395,7 @@ func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWor
386395
newTask, err := p.Engine.RespondWorkflowTaskCompleted(
387396
NewContext(),
388397
&workflowservice.RespondWorkflowTaskCompletedRequest{
398+
Namespace: p.Namespace,
389399
TaskToken: response.TaskToken,
390400
Identity: p.Identity,
391401
Commands: commands,
@@ -437,6 +447,7 @@ retry:
437447
if cancel {
438448
p.Logger.Info("Executing RespondActivityTaskCanceled")
439449
_, err := p.Engine.RespondActivityTaskCanceled(NewContext(), &workflowservice.RespondActivityTaskCanceledRequest{
450+
Namespace: p.Namespace,
440451
TaskToken: response.TaskToken,
441452
Details: payloads.EncodeString("details"),
442453
Identity: p.Identity,
@@ -446,6 +457,7 @@ retry:
446457

447458
if err2 != nil {
448459
_, err := p.Engine.RespondActivityTaskFailed(NewContext(), &workflowservice.RespondActivityTaskFailedRequest{
460+
Namespace: p.Namespace,
449461
TaskToken: response.TaskToken,
450462
Failure: newApplicationFailure(err2, false, nil),
451463
Identity: p.Identity,
@@ -454,6 +466,7 @@ retry:
454466
}
455467

456468
_, err = p.Engine.RespondActivityTaskCompleted(NewContext(), &workflowservice.RespondActivityTaskCompletedRequest{
469+
Namespace: p.Namespace,
457470
TaskToken: response.TaskToken,
458471
Identity: p.Identity,
459472
Result: result,

tests/workflow_task_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func (s *integrationSuite) TestWorkflowTaskHeartbeatingWithEmptyResult() {
9595
hbTimeout := 0
9696
for i := 0; i < 12; i++ {
9797
resp2, err2 := s.engine.RespondWorkflowTaskCompleted(NewContext(), &workflowservice.RespondWorkflowTaskCompletedRequest{
98+
Namespace: s.namespace,
9899
TaskToken: taskToken,
99100
Commands: []*commandpb.Command{},
100101
StickyAttributes: &taskqueuepb.StickyExecutionAttributes{
@@ -125,6 +126,7 @@ func (s *integrationSuite) TestWorkflowTaskHeartbeatingWithEmptyResult() {
125126
s.Equal(2, hbTimeout)
126127

127128
resp5, err5 := s.engine.RespondWorkflowTaskCompleted(NewContext(), &workflowservice.RespondWorkflowTaskCompletedRequest{
129+
Namespace: s.namespace,
128130
TaskToken: taskToken,
129131
Commands: []*commandpb.Command{
130132
{
@@ -198,6 +200,7 @@ func (s *integrationSuite) TestWorkflowTaskHeartbeatingWithLocalActivitiesResult
198200
s.assertLastHistoryEvent(we, 3, enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED)
199201

200202
resp2, err2 := s.engine.RespondWorkflowTaskCompleted(NewContext(), &workflowservice.RespondWorkflowTaskCompletedRequest{
203+
Namespace: s.namespace,
201204
TaskToken: resp1.GetTaskToken(),
202205
Commands: []*commandpb.Command{},
203206
StickyAttributes: &taskqueuepb.StickyExecutionAttributes{
@@ -210,6 +213,7 @@ func (s *integrationSuite) TestWorkflowTaskHeartbeatingWithLocalActivitiesResult
210213
s.NoError(err2)
211214

212215
resp3, err3 := s.engine.RespondWorkflowTaskCompleted(NewContext(), &workflowservice.RespondWorkflowTaskCompletedRequest{
216+
Namespace: s.namespace,
213217
TaskToken: resp2.WorkflowTask.GetTaskToken(),
214218
Commands: []*commandpb.Command{
215219
{
@@ -231,6 +235,7 @@ func (s *integrationSuite) TestWorkflowTaskHeartbeatingWithLocalActivitiesResult
231235
s.NoError(err3)
232236

233237
resp4, err4 := s.engine.RespondWorkflowTaskCompleted(NewContext(), &workflowservice.RespondWorkflowTaskCompletedRequest{
238+
Namespace: s.namespace,
234239
TaskToken: resp3.WorkflowTask.GetTaskToken(),
235240
Commands: []*commandpb.Command{
236241
{
@@ -252,6 +257,7 @@ func (s *integrationSuite) TestWorkflowTaskHeartbeatingWithLocalActivitiesResult
252257
s.NoError(err4)
253258

254259
resp5, err5 := s.engine.RespondWorkflowTaskCompleted(NewContext(), &workflowservice.RespondWorkflowTaskCompletedRequest{
260+
Namespace: s.namespace,
255261
TaskToken: resp4.WorkflowTask.GetTaskToken(),
256262
Commands: []*commandpb.Command{
257263
{
@@ -496,6 +502,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularWorkflowTask
496502

497503
// fail this workflow task to flush buffer, and then another workflow task will be scheduled
498504
_, err2 := s.engine.RespondWorkflowTaskFailed(NewContext(), &workflowservice.RespondWorkflowTaskFailedRequest{
505+
Namespace: s.namespace,
499506
TaskToken: resp1.GetTaskToken(),
500507
Cause: cause,
501508
Identity: "integ test",
@@ -573,6 +580,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalBeforeTransientWorkflowT
573580
}
574581

575582
_, err2 := s.engine.RespondWorkflowTaskFailed(NewContext(), &workflowservice.RespondWorkflowTaskFailedRequest{
583+
Namespace: s.namespace,
576584
TaskToken: resp1.GetTaskToken(),
577585
Cause: cause,
578586
Identity: "integ test",
@@ -676,6 +684,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientWorkflowTa
676684
}
677685

678686
_, err2 := s.engine.RespondWorkflowTaskFailed(NewContext(), &workflowservice.RespondWorkflowTaskFailedRequest{
687+
Namespace: s.namespace,
679688
TaskToken: resp1.GetTaskToken(),
680689
Cause: cause,
681690
Identity: "integ test",
@@ -776,6 +785,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientWorkflowTa
776785
}
777786

778787
_, err2 := s.engine.RespondWorkflowTaskFailed(NewContext(), &workflowservice.RespondWorkflowTaskFailedRequest{
788+
Namespace: s.namespace,
779789
TaskToken: resp1.GetTaskToken(),
780790
Cause: cause,
781791
Identity: "integ test",
@@ -809,6 +819,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientWorkflowTa
809819

810820
// fail this workflow task to flush buffer
811821
_, err2 := s.engine.RespondWorkflowTaskFailed(NewContext(), &workflowservice.RespondWorkflowTaskFailedRequest{
822+
Namespace: s.namespace,
812823
TaskToken: resp1.GetTaskToken(),
813824
Cause: cause,
814825
Identity: "integ test",

0 commit comments

Comments
 (0)