Skip to content

Commit 3948854

Browse files
authored
Allow connect clusters with different shards (#3777)
* Allow connect clusters with different shards
1 parent 51fd1dd commit 3948854

File tree

4 files changed

+128
-34
lines changed

4 files changed

+128
-34
lines changed

service/frontend/adminHandler.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -1580,9 +1580,15 @@ func (adh *AdminHandler) validateRemoteClusterMetadata(metadata *adminservice.De
15801580
return serviceerror.NewInvalidArgument("Cannot add remote cluster due to failover version increment mismatch")
15811581
}
15821582
if metadata.GetHistoryShardCount() != adh.config.NumHistoryShards {
1583-
// cluster shard number not equal
1584-
// TODO: remove this check once we support different shard numbers
1585-
return serviceerror.NewInvalidArgument("Cannot add remote cluster due to history shard number mismatch")
1583+
remoteShardCount := metadata.GetHistoryShardCount()
1584+
large := remoteShardCount
1585+
small := adh.config.NumHistoryShards
1586+
if large < small {
1587+
small, large = large, small
1588+
}
1589+
if large%small != 0 {
1590+
return serviceerror.NewInvalidArgument("Remote cluster shard number and local cluster shard number are not multiples.")
1591+
}
15861592
}
15871593
if !metadata.IsGlobalNamespaceEnabled {
15881594
// remote cluster doesn't support global namespace

service/frontend/adminHandler_test.go

+56-14
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,9 @@ func (s *adminHandlerSuite) SetupTest() {
134134
NumHistoryShards: 1,
135135
}
136136

137-
cfg := &Config{}
137+
cfg := &Config{
138+
NumHistoryShards: 4,
139+
}
138140
args := NewAdminHandlerArgs{
139141
persistenceConfig,
140142
cfg,
@@ -793,7 +795,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success()
793795
&adminservice.DescribeClusterResponse{
794796
ClusterId: clusterId,
795797
ClusterName: clusterName,
796-
HistoryShardCount: 0,
798+
HistoryShardCount: 4,
797799
FailoverVersionIncrement: 0,
798800
InitialFailoverVersion: 0,
799801
IsGlobalNamespaceEnabled: true,
@@ -805,7 +807,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success()
805807
s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
806808
ClusterMetadata: persistencespb.ClusterMetadata{
807809
ClusterName: clusterName,
808-
HistoryShardCount: 0,
810+
HistoryShardCount: 4,
809811
ClusterId: clusterId,
810812
ClusterAddress: rpcAddress,
811813
FailoverVersionIncrement: 0,
@@ -832,7 +834,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Success
832834
&adminservice.DescribeClusterResponse{
833835
ClusterId: clusterId,
834836
ClusterName: clusterName,
835-
HistoryShardCount: 0,
837+
HistoryShardCount: 4,
836838
FailoverVersionIncrement: 0,
837839
InitialFailoverVersion: 0,
838840
IsGlobalNamespaceEnabled: true,
@@ -844,7 +846,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Success
844846
s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
845847
ClusterMetadata: persistencespb.ClusterMetadata{
846848
ClusterName: clusterName,
847-
HistoryShardCount: 0,
849+
HistoryShardCount: 4,
848850
ClusterId: clusterId,
849851
ClusterAddress: rpcAddress,
850852
FailoverVersionIncrement: 0,
@@ -901,7 +903,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Failov
901903
s.IsType(&serviceerror.InvalidArgument{}, err)
902904
}
903905

904-
func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCountMismatch() {
906+
func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCount_Invalid() {
905907
var rpcAddress = uuid.New()
906908
var clusterName = uuid.New()
907909
var clusterId = uuid.New()
@@ -914,7 +916,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardC
914916
&adminservice.DescribeClusterResponse{
915917
ClusterId: clusterId,
916918
ClusterName: clusterName,
917-
HistoryShardCount: 1000,
919+
HistoryShardCount: 5,
918920
FailoverVersionIncrement: 0,
919921
InitialFailoverVersion: 0,
920922
IsGlobalNamespaceEnabled: true,
@@ -924,6 +926,46 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardC
924926
s.IsType(&serviceerror.InvalidArgument{}, err)
925927
}
926928

929+
func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ShardCount_Multiple() {
930+
var rpcAddress = uuid.New()
931+
var clusterName = uuid.New()
932+
var clusterId = uuid.New()
933+
var recordVersion int64 = 5
934+
935+
s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
936+
s.mockMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation))
937+
s.mockClientFactory.EXPECT().NewRemoteAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
938+
s.mockAdminClient,
939+
)
940+
s.mockAdminClient.EXPECT().DescribeCluster(gomock.Any(), &adminservice.DescribeClusterRequest{}).Return(
941+
&adminservice.DescribeClusterResponse{
942+
ClusterId: clusterId,
943+
ClusterName: clusterName,
944+
HistoryShardCount: 16,
945+
FailoverVersionIncrement: 0,
946+
InitialFailoverVersion: 0,
947+
IsGlobalNamespaceEnabled: true,
948+
}, nil)
949+
s.mockClusterMetadataManager.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return(
950+
&persistence.GetClusterMetadataResponse{
951+
Version: recordVersion,
952+
}, nil)
953+
s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
954+
ClusterMetadata: persistencespb.ClusterMetadata{
955+
ClusterName: clusterName,
956+
HistoryShardCount: 16,
957+
ClusterId: clusterId,
958+
ClusterAddress: rpcAddress,
959+
FailoverVersionIncrement: 0,
960+
InitialFailoverVersion: 0,
961+
IsGlobalNamespaceEnabled: true,
962+
},
963+
Version: recordVersion,
964+
}).Return(true, nil)
965+
_, err := s.handler.AddOrUpdateRemoteCluster(context.Background(), &adminservice.AddOrUpdateRemoteClusterRequest{FrontendAddress: rpcAddress})
966+
s.NoError(err)
967+
}
968+
927969
func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_GlobalNamespaceDisabled() {
928970
var rpcAddress = uuid.New()
929971
var clusterName = uuid.New()
@@ -937,7 +979,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Global
937979
&adminservice.DescribeClusterResponse{
938980
ClusterId: clusterId,
939981
ClusterName: clusterName,
940-
HistoryShardCount: 0,
982+
HistoryShardCount: 4,
941983
FailoverVersionIncrement: 0,
942984
InitialFailoverVersion: 0,
943985
IsGlobalNamespaceEnabled: false,
@@ -963,7 +1005,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Initia
9631005
&adminservice.DescribeClusterResponse{
9641006
ClusterId: clusterId,
9651007
ClusterName: clusterName,
966-
HistoryShardCount: 0,
1008+
HistoryShardCount: 4,
9671009
FailoverVersionIncrement: 0,
9681010
InitialFailoverVersion: 0,
9691011
IsGlobalNamespaceEnabled: true,
@@ -1001,7 +1043,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_GetClusterMetadata_Err
10011043
&adminservice.DescribeClusterResponse{
10021044
ClusterId: clusterId,
10031045
ClusterName: clusterName,
1004-
HistoryShardCount: 0,
1046+
HistoryShardCount: 4,
10051047
FailoverVersionIncrement: 0,
10061048
InitialFailoverVersion: 0,
10071049
IsGlobalNamespaceEnabled: true,
@@ -1028,7 +1070,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_Er
10281070
&adminservice.DescribeClusterResponse{
10291071
ClusterId: clusterId,
10301072
ClusterName: clusterName,
1031-
HistoryShardCount: 0,
1073+
HistoryShardCount: 4,
10321074
FailoverVersionIncrement: 0,
10331075
InitialFailoverVersion: 0,
10341076
IsGlobalNamespaceEnabled: true,
@@ -1040,7 +1082,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_Er
10401082
s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
10411083
ClusterMetadata: persistencespb.ClusterMetadata{
10421084
ClusterName: clusterName,
1043-
HistoryShardCount: 0,
1085+
HistoryShardCount: 4,
10441086
ClusterId: clusterId,
10451087
ClusterAddress: rpcAddress,
10461088
FailoverVersionIncrement: 0,
@@ -1067,7 +1109,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_No
10671109
&adminservice.DescribeClusterResponse{
10681110
ClusterId: clusterId,
10691111
ClusterName: clusterName,
1070-
HistoryShardCount: 0,
1112+
HistoryShardCount: 4,
10711113
FailoverVersionIncrement: 0,
10721114
InitialFailoverVersion: 0,
10731115
IsGlobalNamespaceEnabled: true,
@@ -1079,7 +1121,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_No
10791121
s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
10801122
ClusterMetadata: persistencespb.ClusterMetadata{
10811123
ClusterName: clusterName,
1082-
HistoryShardCount: 0,
1124+
HistoryShardCount: 4,
10831125
ClusterId: clusterId,
10841126
ClusterAddress: rpcAddress,
10851127
FailoverVersionIncrement: 0,

service/frontend/operator_handler.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -499,9 +499,15 @@ func (h *OperatorHandlerImpl) validateRemoteClusterMetadata(metadata *adminservi
499499
return serviceerror.NewInvalidArgument("Cannot add remote cluster due to failover version increment mismatch")
500500
}
501501
if metadata.GetHistoryShardCount() != h.config.NumHistoryShards {
502-
// cluster shard number not equal
503-
// TODO: remove this check once we support different shard numbers
504-
return serviceerror.NewInvalidArgument("Cannot add remote cluster due to history shard number mismatch")
502+
remoteShardCount := metadata.GetHistoryShardCount()
503+
large := remoteShardCount
504+
small := h.config.NumHistoryShards
505+
if large < small {
506+
small, large = large, small
507+
}
508+
if large%small != 0 {
509+
return serviceerror.NewInvalidArgument("Remote cluster shard number and local cluster shard number are not multiples.")
510+
}
505511
}
506512
if !metadata.IsGlobalNamespaceEnabled {
507513
// remote cluster doesn't support global namespace

service/frontend/operator_handler_test.go

+54-14
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (s *operatorHandlerSuite) SetupTest() {
7878
s.mockResource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New()).AnyTimes()
7979

8080
args := NewOperatorHandlerImplArgs{
81-
&Config{},
81+
&Config{NumHistoryShards: 4},
8282
nil,
8383
s.mockResource.ESClient,
8484
s.mockResource.Logger,
@@ -506,7 +506,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success
506506
&adminservice.DescribeClusterResponse{
507507
ClusterId: clusterId,
508508
ClusterName: clusterName,
509-
HistoryShardCount: 0,
509+
HistoryShardCount: 4,
510510
FailoverVersionIncrement: 0,
511511
InitialFailoverVersion: 0,
512512
IsGlobalNamespaceEnabled: true,
@@ -518,7 +518,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success
518518
s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
519519
ClusterMetadata: persistencespb.ClusterMetadata{
520520
ClusterName: clusterName,
521-
HistoryShardCount: 0,
521+
HistoryShardCount: 4,
522522
ClusterId: clusterId,
523523
ClusterAddress: rpcAddress,
524524
FailoverVersionIncrement: 0,
@@ -545,7 +545,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Succ
545545
&adminservice.DescribeClusterResponse{
546546
ClusterId: clusterId,
547547
ClusterName: clusterName,
548-
HistoryShardCount: 0,
548+
HistoryShardCount: 4,
549549
FailoverVersionIncrement: 0,
550550
InitialFailoverVersion: 0,
551551
IsGlobalNamespaceEnabled: true,
@@ -557,7 +557,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Succ
557557
s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
558558
ClusterMetadata: persistencespb.ClusterMetadata{
559559
ClusterName: clusterName,
560-
HistoryShardCount: 0,
560+
HistoryShardCount: 4,
561561
ClusterId: clusterId,
562562
ClusterAddress: rpcAddress,
563563
FailoverVersionIncrement: 0,
@@ -614,7 +614,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Fai
614614
s.IsType(&serviceerror.InvalidArgument{}, err)
615615
}
616616

617-
func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCountMismatch() {
617+
func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCount_Invalid() {
618618
var rpcAddress = uuid.New()
619619
var clusterName = uuid.New()
620620
var clusterId = uuid.New()
@@ -627,7 +627,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Sha
627627
&adminservice.DescribeClusterResponse{
628628
ClusterId: clusterId,
629629
ClusterName: clusterName,
630-
HistoryShardCount: 1000,
630+
HistoryShardCount: 5,
631631
FailoverVersionIncrement: 0,
632632
InitialFailoverVersion: 0,
633633
IsGlobalNamespaceEnabled: true,
@@ -637,6 +637,46 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Sha
637637
s.IsType(&serviceerror.InvalidArgument{}, err)
638638
}
639639

640+
func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ShardCount_Multiple() {
641+
var rpcAddress = uuid.New()
642+
var clusterName = uuid.New()
643+
var clusterId = uuid.New()
644+
var recordVersion int64 = 5
645+
646+
s.mockResource.ClusterMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
647+
s.mockResource.ClusterMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation))
648+
s.mockResource.ClientFactory.EXPECT().NewRemoteAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
649+
s.mockResource.RemoteAdminClient,
650+
)
651+
s.mockResource.RemoteAdminClient.EXPECT().DescribeCluster(gomock.Any(), &adminservice.DescribeClusterRequest{}).Return(
652+
&adminservice.DescribeClusterResponse{
653+
ClusterId: clusterId,
654+
ClusterName: clusterName,
655+
HistoryShardCount: 16,
656+
FailoverVersionIncrement: 0,
657+
InitialFailoverVersion: 0,
658+
IsGlobalNamespaceEnabled: true,
659+
}, nil)
660+
s.mockResource.ClusterMetadataMgr.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return(
661+
&persistence.GetClusterMetadataResponse{
662+
Version: recordVersion,
663+
}, nil)
664+
s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
665+
ClusterMetadata: persistencespb.ClusterMetadata{
666+
ClusterName: clusterName,
667+
HistoryShardCount: 16,
668+
ClusterId: clusterId,
669+
ClusterAddress: rpcAddress,
670+
FailoverVersionIncrement: 0,
671+
InitialFailoverVersion: 0,
672+
IsGlobalNamespaceEnabled: true,
673+
},
674+
Version: recordVersion,
675+
}).Return(true, nil)
676+
_, err := s.handler.AddOrUpdateRemoteCluster(context.Background(), &operatorservice.AddOrUpdateRemoteClusterRequest{FrontendAddress: rpcAddress})
677+
s.NoError(err)
678+
}
679+
640680
func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_GlobalNamespaceDisabled() {
641681
var rpcAddress = uuid.New()
642682
var clusterName = uuid.New()
@@ -650,7 +690,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Glo
650690
&adminservice.DescribeClusterResponse{
651691
ClusterId: clusterId,
652692
ClusterName: clusterName,
653-
HistoryShardCount: 0,
693+
HistoryShardCount: 4,
654694
FailoverVersionIncrement: 0,
655695
InitialFailoverVersion: 0,
656696
IsGlobalNamespaceEnabled: false,
@@ -676,7 +716,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Ini
676716
&adminservice.DescribeClusterResponse{
677717
ClusterId: clusterId,
678718
ClusterName: clusterName,
679-
HistoryShardCount: 0,
719+
HistoryShardCount: 4,
680720
FailoverVersionIncrement: 0,
681721
InitialFailoverVersion: 0,
682722
IsGlobalNamespaceEnabled: true,
@@ -714,7 +754,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_GetClusterMetadata_
714754
&adminservice.DescribeClusterResponse{
715755
ClusterId: clusterId,
716756
ClusterName: clusterName,
717-
HistoryShardCount: 0,
757+
HistoryShardCount: 4,
718758
FailoverVersionIncrement: 0,
719759
InitialFailoverVersion: 0,
720760
IsGlobalNamespaceEnabled: true,
@@ -741,7 +781,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata
741781
&adminservice.DescribeClusterResponse{
742782
ClusterId: clusterId,
743783
ClusterName: clusterName,
744-
HistoryShardCount: 0,
784+
HistoryShardCount: 4,
745785
FailoverVersionIncrement: 0,
746786
InitialFailoverVersion: 0,
747787
IsGlobalNamespaceEnabled: true,
@@ -753,7 +793,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata
753793
s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
754794
ClusterMetadata: persistencespb.ClusterMetadata{
755795
ClusterName: clusterName,
756-
HistoryShardCount: 0,
796+
HistoryShardCount: 4,
757797
ClusterId: clusterId,
758798
ClusterAddress: rpcAddress,
759799
FailoverVersionIncrement: 0,
@@ -780,7 +820,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata
780820
&adminservice.DescribeClusterResponse{
781821
ClusterId: clusterId,
782822
ClusterName: clusterName,
783-
HistoryShardCount: 0,
823+
HistoryShardCount: 4,
784824
FailoverVersionIncrement: 0,
785825
InitialFailoverVersion: 0,
786826
IsGlobalNamespaceEnabled: true,
@@ -792,7 +832,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata
792832
s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
793833
ClusterMetadata: persistencespb.ClusterMetadata{
794834
ClusterName: clusterName,
795-
HistoryShardCount: 0,
835+
HistoryShardCount: 4,
796836
ClusterId: clusterId,
797837
ClusterAddress: rpcAddress,
798838
FailoverVersionIncrement: 0,

0 commit comments

Comments
 (0)