Skip to content

Commit b132fc2

Browse files
authored
Redirect replication requests when shard count is different (#3789)
* Redirect replication requests when shard count is different
1 parent 26503cf commit b132fc2

7 files changed

+41
-37
lines changed

service/history/replication/dlq_handler.go

-5
Original file line numberDiff line numberDiff line change
@@ -340,15 +340,10 @@ func (r *dlqHandlerImpl) getOrCreateTaskExecutor(ctx context.Context, clusterNam
340340
if executor, ok := r.taskExecutors[clusterName]; ok {
341341
return executor, nil
342342
}
343-
engine, err := r.shard.GetEngine(ctx)
344-
if err != nil {
345-
return nil, err
346-
}
347343
taskExecutor := r.taskExecutorProvider(TaskExecutorParams{
348344
RemoteCluster: clusterName,
349345
Shard: r.shard,
350346
HistoryResender: r.resender,
351-
HistoryEngine: engine,
352347
DeleteManager: r.deleteManager,
353348
WorkflowCache: r.workflowCache,
354349
})

service/history/replication/dlq_handler_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ func (s *dlqHandlerSuite) SetupTest() {
127127
params.RemoteCluster,
128128
params.Shard,
129129
params.HistoryResender,
130-
params.HistoryEngine,
131130
params.DeleteManager,
132131
params.WorkflowCache,
133132
)

service/history/replication/fx.go

-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ func ReplicationTaskExecutorProvider() TaskExecutorProvider {
5858
params.RemoteCluster,
5959
params.Shard,
6060
params.HistoryResender,
61-
params.HistoryEngine,
6261
params.DeleteManager,
6362
params.WorkflowCache,
6463
)

service/history/replication/task_executor.go

+25-16
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ type (
5858
RemoteCluster string // TODO: Remove this remote cluster from executor then it can use singleton.
5959
Shard shard.Context
6060
HistoryResender xdc.NDCHistoryResender
61-
HistoryEngine shard.Engine
6261
DeleteManager deletemanager.DeleteManager
6362
WorkflowCache wcache.Cache
6463
}
@@ -68,10 +67,9 @@ type (
6867
taskExecutorImpl struct {
6968
currentCluster string
7069
remoteCluster string
71-
shard shard.Context
70+
shardContext shard.Context
7271
namespaceRegistry namespace.Registry
7372
nDCHistoryResender xdc.NDCHistoryResender
74-
historyEngine shard.Engine
7573
deleteManager deletemanager.DeleteManager
7674
workflowCache wcache.Cache
7775
metricsHandler metrics.Handler
@@ -83,23 +81,21 @@ type (
8381
// The executor uses by 1) DLQ replication task handler 2) history replication task processor
8482
func NewTaskExecutor(
8583
remoteCluster string,
86-
shard shard.Context,
84+
shardContext shard.Context,
8785
nDCHistoryResender xdc.NDCHistoryResender,
88-
historyEngine shard.Engine,
8986
deleteManager deletemanager.DeleteManager,
9087
workflowCache wcache.Cache,
9188
) TaskExecutor {
9289
return &taskExecutorImpl{
93-
currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(),
90+
currentCluster: shardContext.GetClusterMetadata().GetCurrentClusterName(),
9491
remoteCluster: remoteCluster,
95-
shard: shard,
96-
namespaceRegistry: shard.GetNamespaceRegistry(),
92+
shardContext: shardContext,
93+
namespaceRegistry: shardContext.GetNamespaceRegistry(),
9794
nDCHistoryResender: nDCHistoryResender,
98-
historyEngine: historyEngine,
9995
deleteManager: deleteManager,
10096
workflowCache: workflowCache,
101-
metricsHandler: shard.GetMetricsHandler(),
102-
logger: shard.GetLogger(),
97+
metricsHandler: shardContext.GetMetricsHandler(),
98+
logger: shardContext.GetLogger(),
10399
}
104100
}
105101

@@ -167,7 +163,9 @@ func (e *taskExecutorImpl) handleActivityTask(
167163
ctx, cancel := e.newTaskContext(ctx, attr.NamespaceId)
168164
defer cancel()
169165

170-
err = e.historyEngine.SyncActivity(ctx, request)
166+
// This might be extra cost if the workflow belongs to local shard.
167+
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
168+
_, err = e.shardContext.GetHistoryClient().SyncActivity(ctx, request)
171169
switch retryErr := err.(type) {
172170
case nil:
173171
return nil
@@ -206,7 +204,10 @@ func (e *taskExecutorImpl) handleActivityTask(
206204
e.logger.Error("error resend history for history event", tag.Error(resendErr))
207205
return err
208206
}
209-
return e.historyEngine.SyncActivity(ctx, request)
207+
// This might be extra cost if the workflow belongs to local shard.
208+
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
209+
_, err = e.shardContext.GetHistoryClient().SyncActivity(ctx, request)
210+
return err
210211

211212
default:
212213
return err
@@ -247,7 +248,9 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask(
247248
ctx, cancel := e.newTaskContext(ctx, attr.NamespaceId)
248249
defer cancel()
249250

250-
err = e.historyEngine.ReplicateEventsV2(ctx, request)
251+
// This might be extra cost if the workflow belongs to local shard.
252+
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
253+
_, err = e.shardContext.GetHistoryClient().ReplicateEventsV2(ctx, request)
251254
switch retryErr := err.(type) {
252255
case nil:
253256
return nil
@@ -287,7 +290,10 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask(
287290
return err
288291
}
289292

290-
return e.historyEngine.ReplicateEventsV2(ctx, request)
293+
// This might be extra cost if the workflow belongs to local shard.
294+
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
295+
_, err = e.shardContext.GetHistoryClient().ReplicateEventsV2(ctx, request)
296+
return err
291297

292298
default:
293299
return err
@@ -312,10 +318,13 @@ func (e *taskExecutorImpl) handleSyncWorkflowStateTask(
312318
ctx, cancel := e.newTaskContext(ctx, executionInfo.NamespaceId)
313319
defer cancel()
314320

315-
return e.historyEngine.ReplicateWorkflowState(ctx, &historyservice.ReplicateWorkflowStateRequest{
321+
// This might be extra cost if the workflow belongs to local shard.
322+
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
323+
_, err = e.shardContext.GetHistoryClient().ReplicateWorkflowState(ctx, &historyservice.ReplicateWorkflowStateRequest{
316324
WorkflowState: attr.GetWorkflowState(),
317325
RemoteCluster: e.remoteCluster,
318326
})
327+
return err
319328
}
320329

321330
func (e *taskExecutorImpl) filterTask(

service/history/replication/task_executor_test.go

+10-13
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ type (
6464
remoteCluster string
6565
mockResource *resourcetest.Test
6666
mockShard *shard.ContextTest
67-
mockEngine *shard.MockEngine
6867
config *configs.Config
6968
historyClient *historyservicemock.MockHistoryServiceClient
7069
mockNamespaceCache *namespace.MockRegistry
@@ -107,7 +106,6 @@ func (s *taskExecutorSuite) SetupTest() {
107106
}},
108107
s.config,
109108
)
110-
s.mockEngine = shard.NewMockEngine(s.controller)
111109
s.mockResource = s.mockShard.Resource
112110
s.mockNamespaceCache = s.mockResource.NamespaceCache
113111
s.clusterMetadata = s.mockResource.ClusterMetadata
@@ -117,12 +115,11 @@ func (s *taskExecutorSuite) SetupTest() {
117115

118116
s.clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
119117
s.mockNamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil).AnyTimes()
120-
118+
s.mockShard.SetHistoryClientForTesting(s.historyClient)
121119
s.replicationTaskExecutor = NewTaskExecutor(
122120
s.remoteCluster,
123121
s.mockShard,
124122
s.nDCHistoryResender,
125-
s.mockEngine,
126123
deletemanager.NewMockDeleteManager(s.controller),
127124
s.workflowCache,
128125
).(*taskExecutorImpl)
@@ -231,7 +228,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() {
231228
LastWorkerIdentity: "",
232229
}
233230

234-
s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil)
231+
s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(&historyservice.SyncActivityResponse{}, nil)
235232
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
236233
s.NoError(err)
237234
}
@@ -284,7 +281,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese
284281
345,
285282
456,
286283
)
287-
s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(resendErr)
284+
s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(nil, resendErr)
288285
s.nDCHistoryResender.EXPECT().SendSingleWorkflowHistory(
289286
gomock.Any(),
290287
s.remoteCluster,
@@ -296,7 +293,8 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese
296293
int64(345),
297294
int64(456),
298295
)
299-
s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil)
296+
297+
s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(&historyservice.SyncActivityResponse{}, nil)
300298
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
301299
s.NoError(err)
302300
}
@@ -328,8 +326,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask() {
328326
Events: nil,
329327
NewRunEvents: nil,
330328
}
331-
332-
s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil)
329+
s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(&historyservice.ReplicateEventsV2Response{}, nil)
333330
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
334331
s.NoError(err)
335332
}
@@ -372,7 +369,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() {
372369
345,
373370
456,
374371
)
375-
s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(resendErr)
372+
s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil, resendErr)
376373
s.nDCHistoryResender.EXPECT().SendSingleWorkflowHistory(
377374
gomock.Any(),
378375
s.remoteCluster,
@@ -384,7 +381,8 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() {
384381
int64(345),
385382
int64(456),
386383
)
387-
s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil)
384+
385+
s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(&historyservice.ReplicateEventsV2Response{}, nil)
388386
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
389387
s.NoError(err)
390388
}
@@ -403,8 +401,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncWorkflowStateTask() {
403401
},
404402
},
405403
}
406-
407-
s.mockEngine.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(nil)
404+
s.historyClient.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(&historyservice.ReplicateWorkflowStateResponse{}, nil)
408405

409406
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
410407
s.NoError(err)

service/history/replication/task_processor_manager.go

-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,6 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate(
190190
RemoteCluster: clusterName,
191191
Shard: r.shard,
192192
HistoryResender: r.resender,
193-
HistoryEngine: r.engine,
194193
DeleteManager: r.deleteMgr,
195194
WorkflowCache: r.workflowCache,
196195
}),

service/history/shard/context_testutil.go

+6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131

3232
"github.com/golang/mock/gomock"
3333

34+
"go.temporal.io/server/api/historyservice/v1"
3435
persistencespb "go.temporal.io/server/api/persistence/v1"
3536
"go.temporal.io/server/common/clock"
3637
"go.temporal.io/server/common/future"
@@ -133,6 +134,11 @@ func (s *ContextTest) SetEventsCacheForTesting(c events.Cache) {
133134
s.eventsCache = c
134135
}
135136

137+
// SetHistoryClientForTesting sets history client. Only used by tests.
138+
func (s *ContextTest) SetHistoryClientForTesting(client historyservice.HistoryServiceClient) {
139+
s.historyClient = client
140+
}
141+
136142
// StopForTest calls private method finishStop(). In general only the controller
137143
// should call that, but integration tests need to do it also to clean up any
138144
// background acquireShard goroutines that may exist.

0 commit comments

Comments
 (0)