Skip to content

Commit 75c05a2

Browse files
authored
Call RedirectionInterceptor only for WorkflowHandler (#3924)
1 parent d4498d1 commit 75c05a2

File tree

3 files changed

+87
-82
lines changed

3 files changed

+87
-82
lines changed

common/rpc/interceptor/api_name.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import "strings"
2828

2929
func SplitMethodName(
3030
fullMethodName string,
31-
) (_ string, _ string) {
31+
) (string, string) {
3232
fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
3333
if i := strings.Index(fullMethodName, "/"); i >= 0 {
3434
return fullMethodName[:i], fullMethodName[i+1:]

service/frontend/redirection_interceptor.go

+80-75
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ package frontend
2626

2727
import (
2828
"context"
29-
"fmt"
3029
"time"
3130

3231
"go.temporal.io/api/workflowservice/v1"
@@ -37,6 +36,7 @@ import (
3736
"go.temporal.io/server/common/cluster"
3837
"go.temporal.io/server/common/config"
3938
"go.temporal.io/server/common/log"
39+
"go.temporal.io/server/common/log/tag"
4040
"go.temporal.io/server/common/metrics"
4141
"go.temporal.io/server/common/namespace"
4242
"go.temporal.io/server/common/rpc/interceptor"
@@ -45,75 +45,75 @@ import (
4545
var (
4646
dcRedirectionMetricsPrefix = "DCRedirection"
4747

48-
localAPIResults = map[string]respAllocFn{
48+
localAPIResponses = map[string]responseConstructorFn{
4949
// Namespace APIs, namespace APIs does not require redirection
50-
"DeprecateNamespace": func() interface{} { return new(workflowservice.DeprecateNamespaceResponse) },
51-
"DescribeNamespace": func() interface{} { return new(workflowservice.DescribeNamespaceResponse) },
52-
"ListNamespaces": func() interface{} { return new(workflowservice.ListNamespacesResponse) },
53-
"RegisterNamespace": func() interface{} { return new(workflowservice.RegisterNamespaceResponse) },
54-
"UpdateNamespace": func() interface{} { return new(workflowservice.UpdateNamespaceResponse) },
50+
"DeprecateNamespace": func() any { return &workflowservice.DeprecateNamespaceResponse{} },
51+
"DescribeNamespace": func() any { return &workflowservice.DescribeNamespaceResponse{} },
52+
"ListNamespaces": func() any { return &workflowservice.ListNamespacesResponse{} },
53+
"RegisterNamespace": func() any { return &workflowservice.RegisterNamespaceResponse{} },
54+
"UpdateNamespace": func() any { return &workflowservice.UpdateNamespaceResponse{} },
5555

5656
// Cluster info APIs, Cluster info APIs does not require redirection
57-
"GetSearchAttributes": func() interface{} { return new(workflowservice.GetSearchAttributesResponse) },
58-
"GetClusterInfo": func() interface{} { return new(workflowservice.GetClusterInfoResponse) },
59-
"GetSystemInfo": func() interface{} { return new(workflowservice.GetSystemInfoResponse) },
57+
"GetSearchAttributes": func() any { return &workflowservice.GetSearchAttributesResponse{} },
58+
"GetClusterInfo": func() any { return &workflowservice.GetClusterInfoResponse{} },
59+
"GetSystemInfo": func() any { return &workflowservice.GetSystemInfoResponse{} },
6060
}
6161

62-
globalAPIResults = map[string]respAllocFn{
63-
"DescribeTaskQueue": func() interface{} { return new(workflowservice.DescribeTaskQueueResponse) },
64-
"DescribeWorkflowExecution": func() interface{} { return new(workflowservice.DescribeWorkflowExecutionResponse) },
65-
"GetWorkflowExecutionHistory": func() interface{} { return new(workflowservice.GetWorkflowExecutionHistoryResponse) },
66-
"GetWorkflowExecutionHistoryReverse": func() interface{} { return new(workflowservice.GetWorkflowExecutionHistoryReverseResponse) },
67-
"ListArchivedWorkflowExecutions": func() interface{} { return new(workflowservice.ListArchivedWorkflowExecutionsResponse) },
68-
"ListClosedWorkflowExecutions": func() interface{} { return new(workflowservice.ListClosedWorkflowExecutionsResponse) },
69-
"ListOpenWorkflowExecutions": func() interface{} { return new(workflowservice.ListOpenWorkflowExecutionsResponse) },
70-
"ListWorkflowExecutions": func() interface{} { return new(workflowservice.ListWorkflowExecutionsResponse) },
71-
"ScanWorkflowExecutions": func() interface{} { return new(workflowservice.ScanWorkflowExecutionsResponse) },
72-
"CountWorkflowExecutions": func() interface{} { return new(workflowservice.CountWorkflowExecutionsResponse) },
73-
"PollActivityTaskQueue": func() interface{} { return new(workflowservice.PollActivityTaskQueueResponse) },
74-
"PollWorkflowTaskQueue": func() interface{} { return new(workflowservice.PollWorkflowTaskQueueResponse) },
75-
"QueryWorkflow": func() interface{} { return new(workflowservice.QueryWorkflowResponse) },
76-
"RecordActivityTaskHeartbeat": func() interface{} { return new(workflowservice.RecordActivityTaskHeartbeatResponse) },
77-
"RecordActivityTaskHeartbeatById": func() interface{} { return new(workflowservice.RecordActivityTaskHeartbeatByIdResponse) },
78-
"RequestCancelWorkflowExecution": func() interface{} { return new(workflowservice.RequestCancelWorkflowExecutionResponse) },
79-
"ResetStickyTaskQueue": func() interface{} { return new(workflowservice.ResetStickyTaskQueueResponse) },
80-
"ResetWorkflowExecution": func() interface{} { return new(workflowservice.ResetWorkflowExecutionResponse) },
81-
"RespondActivityTaskCanceled": func() interface{} { return new(workflowservice.RespondActivityTaskCanceledResponse) },
82-
"RespondActivityTaskCanceledById": func() interface{} { return new(workflowservice.RespondActivityTaskCanceledByIdResponse) },
83-
"RespondActivityTaskCompleted": func() interface{} { return new(workflowservice.RespondActivityTaskCompletedResponse) },
84-
"RespondActivityTaskCompletedById": func() interface{} { return new(workflowservice.RespondActivityTaskCompletedByIdResponse) },
85-
"RespondActivityTaskFailed": func() interface{} { return new(workflowservice.RespondActivityTaskFailedResponse) },
86-
"RespondActivityTaskFailedById": func() interface{} { return new(workflowservice.RespondActivityTaskFailedByIdResponse) },
87-
"RespondWorkflowTaskCompleted": func() interface{} { return new(workflowservice.RespondWorkflowTaskCompletedResponse) },
88-
"RespondWorkflowTaskFailed": func() interface{} { return new(workflowservice.RespondWorkflowTaskFailedResponse) },
89-
"RespondQueryTaskCompleted": func() interface{} { return new(workflowservice.RespondQueryTaskCompletedResponse) },
90-
"SignalWithStartWorkflowExecution": func() interface{} { return new(workflowservice.SignalWithStartWorkflowExecutionResponse) },
91-
"SignalWorkflowExecution": func() interface{} { return new(workflowservice.SignalWorkflowExecutionResponse) },
92-
"StartWorkflowExecution": func() interface{} { return new(workflowservice.StartWorkflowExecutionResponse) },
93-
"UpdateWorkflowExecution": func() interface{} { return new(workflowservice.UpdateWorkflowExecutionResponse) },
94-
"TerminateWorkflowExecution": func() interface{} { return new(workflowservice.TerminateWorkflowExecutionResponse) },
95-
"DeleteWorkflowExecution": func() interface{} { return new(workflowservice.DeleteWorkflowExecutionResponse) },
96-
"ListTaskQueuePartitions": func() interface{} { return new(workflowservice.ListTaskQueuePartitionsResponse) },
97-
98-
"CreateSchedule": func() interface{} { return new(workflowservice.CreateScheduleResponse) },
99-
"DescribeSchedule": func() interface{} { return new(workflowservice.DescribeScheduleResponse) },
100-
"UpdateSchedule": func() interface{} { return new(workflowservice.UpdateScheduleResponse) },
101-
"PatchSchedule": func() interface{} { return new(workflowservice.PatchScheduleResponse) },
102-
"DeleteSchedule": func() interface{} { return new(workflowservice.DeleteScheduleResponse) },
103-
"ListSchedules": func() interface{} { return new(workflowservice.ListSchedulesResponse) },
104-
"ListScheduleMatchingTimes": func() interface{} { return new(workflowservice.ListScheduleMatchingTimesResponse) },
105-
"UpdateWorkerBuildIdOrdering": func() interface{} { return new(workflowservice.UpdateWorkerBuildIdOrderingResponse) },
106-
"GetWorkerBuildIdOrdering": func() interface{} { return new(workflowservice.GetWorkerBuildIdOrderingResponse) },
107-
108-
"StartBatchOperation": func() interface{} { return new(workflowservice.StartBatchOperationResponse) },
109-
"StopBatchOperation": func() interface{} { return new(workflowservice.StopBatchOperationResponse) },
110-
"DescribeBatchOperation": func() interface{} { return new(workflowservice.DescribeBatchOperationResponse) },
111-
"ListBatchOperations": func() interface{} { return new(workflowservice.ListBatchOperationsResponse) },
62+
globalAPIResponses = map[string]responseConstructorFn{
63+
"DescribeTaskQueue": func() any { return &workflowservice.DescribeTaskQueueResponse{} },
64+
"DescribeWorkflowExecution": func() any { return &workflowservice.DescribeWorkflowExecutionResponse{} },
65+
"GetWorkflowExecutionHistory": func() any { return &workflowservice.GetWorkflowExecutionHistoryResponse{} },
66+
"GetWorkflowExecutionHistoryReverse": func() any { return &workflowservice.GetWorkflowExecutionHistoryReverseResponse{} },
67+
"ListArchivedWorkflowExecutions": func() any { return &workflowservice.ListArchivedWorkflowExecutionsResponse{} },
68+
"ListClosedWorkflowExecutions": func() any { return &workflowservice.ListClosedWorkflowExecutionsResponse{} },
69+
"ListOpenWorkflowExecutions": func() any { return &workflowservice.ListOpenWorkflowExecutionsResponse{} },
70+
"ListWorkflowExecutions": func() any { return &workflowservice.ListWorkflowExecutionsResponse{} },
71+
"ScanWorkflowExecutions": func() any { return &workflowservice.ScanWorkflowExecutionsResponse{} },
72+
"CountWorkflowExecutions": func() any { return &workflowservice.CountWorkflowExecutionsResponse{} },
73+
"PollActivityTaskQueue": func() any { return &workflowservice.PollActivityTaskQueueResponse{} },
74+
"PollWorkflowTaskQueue": func() any { return &workflowservice.PollWorkflowTaskQueueResponse{} },
75+
"QueryWorkflow": func() any { return &workflowservice.QueryWorkflowResponse{} },
76+
"RecordActivityTaskHeartbeat": func() any { return &workflowservice.RecordActivityTaskHeartbeatResponse{} },
77+
"RecordActivityTaskHeartbeatById": func() any { return &workflowservice.RecordActivityTaskHeartbeatByIdResponse{} },
78+
"RequestCancelWorkflowExecution": func() any { return &workflowservice.RequestCancelWorkflowExecutionResponse{} },
79+
"ResetStickyTaskQueue": func() any { return &workflowservice.ResetStickyTaskQueueResponse{} },
80+
"ResetWorkflowExecution": func() any { return &workflowservice.ResetWorkflowExecutionResponse{} },
81+
"RespondActivityTaskCanceled": func() any { return &workflowservice.RespondActivityTaskCanceledResponse{} },
82+
"RespondActivityTaskCanceledById": func() any { return &workflowservice.RespondActivityTaskCanceledByIdResponse{} },
83+
"RespondActivityTaskCompleted": func() any { return &workflowservice.RespondActivityTaskCompletedResponse{} },
84+
"RespondActivityTaskCompletedById": func() any { return &workflowservice.RespondActivityTaskCompletedByIdResponse{} },
85+
"RespondActivityTaskFailed": func() any { return &workflowservice.RespondActivityTaskFailedResponse{} },
86+
"RespondActivityTaskFailedById": func() any { return &workflowservice.RespondActivityTaskFailedByIdResponse{} },
87+
"RespondWorkflowTaskCompleted": func() any { return &workflowservice.RespondWorkflowTaskCompletedResponse{} },
88+
"RespondWorkflowTaskFailed": func() any { return &workflowservice.RespondWorkflowTaskFailedResponse{} },
89+
"RespondQueryTaskCompleted": func() any { return &workflowservice.RespondQueryTaskCompletedResponse{} },
90+
"SignalWithStartWorkflowExecution": func() any { return &workflowservice.SignalWithStartWorkflowExecutionResponse{} },
91+
"SignalWorkflowExecution": func() any { return &workflowservice.SignalWorkflowExecutionResponse{} },
92+
"StartWorkflowExecution": func() any { return &workflowservice.StartWorkflowExecutionResponse{} },
93+
"UpdateWorkflowExecution": func() any { return &workflowservice.UpdateWorkflowExecutionResponse{} },
94+
"TerminateWorkflowExecution": func() any { return &workflowservice.TerminateWorkflowExecutionResponse{} },
95+
"DeleteWorkflowExecution": func() any { return &workflowservice.DeleteWorkflowExecutionResponse{} },
96+
"ListTaskQueuePartitions": func() any { return &workflowservice.ListTaskQueuePartitionsResponse{} },
97+
98+
"CreateSchedule": func() any { return &workflowservice.CreateScheduleResponse{} },
99+
"DescribeSchedule": func() any { return &workflowservice.DescribeScheduleResponse{} },
100+
"UpdateSchedule": func() any { return &workflowservice.UpdateScheduleResponse{} },
101+
"PatchSchedule": func() any { return &workflowservice.PatchScheduleResponse{} },
102+
"DeleteSchedule": func() any { return &workflowservice.DeleteScheduleResponse{} },
103+
"ListSchedules": func() any { return &workflowservice.ListSchedulesResponse{} },
104+
"ListScheduleMatchingTimes": func() any { return &workflowservice.ListScheduleMatchingTimesResponse{} },
105+
"UpdateWorkerBuildIdOrdering": func() any { return &workflowservice.UpdateWorkerBuildIdOrderingResponse{} },
106+
"GetWorkerBuildIdOrdering": func() any { return &workflowservice.GetWorkerBuildIdOrderingResponse{} },
107+
108+
"StartBatchOperation": func() any { return &workflowservice.StartBatchOperationResponse{} },
109+
"StopBatchOperation": func() any { return &workflowservice.StopBatchOperationResponse{} },
110+
"DescribeBatchOperation": func() any { return &workflowservice.DescribeBatchOperationResponse{} },
111+
"ListBatchOperations": func() any { return &workflowservice.ListBatchOperationsResponse{} },
112112
}
113113
)
114114

115115
type (
116-
respAllocFn func() interface{}
116+
responseConstructorFn func() any
117117

118118
// RedirectionInterceptor is simple wrapper over frontend service, doing redirection based on policy
119119
RedirectionInterceptor struct {
@@ -162,32 +162,37 @@ var _ grpc.UnaryServerInterceptor = (*RedirectionInterceptor)(nil).Intercept
162162

163163
func (i *RedirectionInterceptor) Intercept(
164164
ctx context.Context,
165-
req interface{},
165+
req any,
166166
info *grpc.UnaryServerInfo,
167167
handler grpc.UnaryHandler,
168-
) (_ interface{}, retError error) {
168+
) (_ any, retError error) {
169169
defer log.CapturePanic(i.logger, &retError)
170170

171+
if _, isWorkflowHandler := info.Server.(*WorkflowHandler); !isWorkflowHandler {
172+
return handler(ctx, req)
173+
}
174+
171175
_, methodName := interceptor.SplitMethodName(info.FullMethod)
172-
if _, ok := localAPIResults[methodName]; ok {
176+
if _, ok := localAPIResponses[methodName]; ok {
173177
return i.handleLocalAPIInvocation(ctx, req, handler, methodName)
174178
}
175-
if respAllocFn, ok := globalAPIResults[methodName]; ok {
179+
if raFn, ok := globalAPIResponses[methodName]; ok {
176180
namespaceName := interceptor.GetNamespace(i.namespaceCache, req)
177-
return i.handleRedirectAPIInvocation(ctx, req, info, handler, methodName, respAllocFn, namespaceName)
181+
return i.handleRedirectAPIInvocation(ctx, req, info, handler, methodName, raFn, namespaceName)
178182
}
179183

180-
// this should not happen for frontend APIs but ok for admin and other future handlers
181-
i.logger.Debug(fmt.Sprintf("RedirectionInterceptor encountered unknown API: %v", info.FullMethod))
184+
// This should not happen unless new API is added without updating localAPIResponses and globalAPIResponses maps.
185+
// Also covered by unit test.
186+
i.logger.Warn("RedirectionInterceptor encountered unknown API", tag.Name(info.FullMethod))
182187
return handler(ctx, req)
183188
}
184189

185190
func (i *RedirectionInterceptor) handleLocalAPIInvocation(
186191
ctx context.Context,
187-
req interface{},
192+
req any,
188193
handler grpc.UnaryHandler,
189194
methodName string,
190-
) (_ interface{}, retError error) {
195+
) (_ any, retError error) {
191196
scope, startTime := i.beforeCall(dcRedirectionMetricsPrefix + methodName)
192197
defer func() {
193198
i.afterCall(scope, startTime, i.currentClusterName, retError)
@@ -197,14 +202,14 @@ func (i *RedirectionInterceptor) handleLocalAPIInvocation(
197202

198203
func (i *RedirectionInterceptor) handleRedirectAPIInvocation(
199204
ctx context.Context,
200-
req interface{},
205+
req any,
201206
info *grpc.UnaryServerInfo,
202207
handler grpc.UnaryHandler,
203208
methodName string,
204-
respAllocFn func() interface{},
209+
respCtorFn responseConstructorFn,
205210
namespaceName namespace.Name,
206-
) (_ interface{}, retError error) {
207-
var resp interface{}
211+
) (_ any, retError error) {
212+
var resp any
208213
var clusterName string
209214
var err error
210215

@@ -222,7 +227,7 @@ func (i *RedirectionInterceptor) handleRedirectAPIInvocation(
222227
if err != nil {
223228
return err
224229
}
225-
resp = respAllocFn()
230+
resp = respCtorFn()
226231
err = remoteClient.Invoke(ctx, info.FullMethod, req, resp)
227232
if err != nil {
228233
return err

service/frontend/redirection_interceptor_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (s *redirectionInterceptorSuite) TearDownTest() {
106106

107107
func (s *redirectionInterceptorSuite) TestLocalAPI() {
108108
apis := make(map[string]struct{})
109-
for api := range localAPIResults {
109+
for api := range localAPIResponses {
110110
apis[api] = struct{}{}
111111
}
112112
s.Equal(map[string]struct{}{
@@ -124,7 +124,7 @@ func (s *redirectionInterceptorSuite) TestLocalAPI() {
124124

125125
func (s *redirectionInterceptorSuite) TestGlobalAPI() {
126126
apis := make(map[string]struct{})
127-
for api := range globalAPIResults {
127+
for api := range globalAPIResponses {
128128
apis[api] = struct{}{}
129129
}
130130
s.Equal(map[string]struct{}{
@@ -189,10 +189,10 @@ func (s *redirectionInterceptorSuite) TestAPIResultMapping() {
189189
}
190190

191191
actualAPIs := make(map[string]interface{})
192-
for api, respAllocFn := range localAPIResults {
192+
for api, respAllocFn := range localAPIResponses {
193193
actualAPIs[api] = reflect.TypeOf(respAllocFn())
194194
}
195-
for api, respAllocFn := range globalAPIResults {
195+
for api, respAllocFn := range globalAPIResponses {
196196
actualAPIs[api] = reflect.TypeOf(respAllocFn())
197197
}
198198
s.Equal(expectedAPIs, actualAPIs)
@@ -250,7 +250,7 @@ func (s *redirectionInterceptorSuite) TestHandleGlobalAPIInvocation_Local() {
250250
info,
251251
handler,
252252
methodName,
253-
globalAPIResults[methodName],
253+
globalAPIResponses[methodName],
254254
namespaceName,
255255
)
256256
s.NoError(err)
@@ -293,7 +293,7 @@ func (s *redirectionInterceptorSuite) TestHandleLocalAPIInvocation_Redirect() {
293293
info,
294294
nil,
295295
methodName,
296-
globalAPIResults[methodName],
296+
globalAPIResponses[methodName],
297297
namespaceName,
298298
)
299299
s.NoError(err)

0 commit comments

Comments
 (0)