Skip to content

Commit bd06826

Browse files
authored
Read cluster shard count from DB (#3788)
1 parent dc932a1 commit bd06826

File tree

3 files changed

+18
-1
lines changed

3 files changed

+18
-1
lines changed

common/cluster/metadata.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ type (
104104
InitialFailoverVersion int64 `yaml:"initialFailoverVersion"`
105105
// Address indicate the remote service address(Host:Port). Host can be DNS name.
106106
RPCAddress string `yaml:"rpcAddress"`
107-
ShardCount int32
107+
ShardCount int32 `yaml:"-"` // Ignore this field when loading config.
108108
// private field to track cluster information updates
109109
version int64
110110
}
@@ -401,6 +401,7 @@ func (m *metadataImpl) RegisterMetadataChangeCallback(callbackId any, cb Callbac
401401
Enabled: clusterInfo.Enabled,
402402
InitialFailoverVersion: clusterInfo.InitialFailoverVersion,
403403
RPCAddress: clusterInfo.RPCAddress,
404+
ShardCount: clusterInfo.ShardCount,
404405
version: clusterInfo.version,
405406
}
406407
}
@@ -454,6 +455,7 @@ func (m *metadataImpl) refreshClusterMetadata(ctx context.Context) error {
454455
Enabled: newClusterInfo.Enabled,
455456
InitialFailoverVersion: newClusterInfo.InitialFailoverVersion,
456457
RPCAddress: newClusterInfo.RPCAddress,
458+
ShardCount: newClusterInfo.ShardCount,
457459
version: newClusterInfo.version,
458460
}
459461
} else if newClusterInfo.version > oldClusterInfo.version {
@@ -468,12 +470,14 @@ func (m *metadataImpl) refreshClusterMetadata(ctx context.Context) error {
468470
Enabled: oldClusterInfo.Enabled,
469471
InitialFailoverVersion: oldClusterInfo.InitialFailoverVersion,
470472
RPCAddress: oldClusterInfo.RPCAddress,
473+
ShardCount: oldClusterInfo.ShardCount,
471474
version: oldClusterInfo.version,
472475
}
473476
newEntries[clusterName] = &ClusterInformation{
474477
Enabled: newClusterInfo.Enabled,
475478
InitialFailoverVersion: newClusterInfo.InitialFailoverVersion,
476479
RPCAddress: newClusterInfo.RPCAddress,
480+
ShardCount: newClusterInfo.ShardCount,
477481
version: newClusterInfo.version,
478482
}
479483
}
@@ -577,6 +581,7 @@ func (m *metadataImpl) listAllClusterMetadataFromDB(
577581
Enabled: getClusterResp.GetIsConnectionEnabled(),
578582
InitialFailoverVersion: getClusterResp.GetInitialFailoverVersion(),
579583
RPCAddress: getClusterResp.GetClusterAddress(),
584+
ShardCount: getClusterResp.GetHistoryShardCount(),
580585
version: getClusterResp.Version,
581586
}
582587
}

common/cluster/metadata_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,14 @@ func (s *metadataSuite) SetupTest() {
8383
Enabled: true,
8484
InitialFailoverVersion: int64(1),
8585
RPCAddress: uuid.New(),
86+
ShardCount: 1,
8687
version: 1,
8788
},
8889
s.secondClusterName: {
8990
Enabled: true,
9091
InitialFailoverVersion: int64(4),
9192
RPCAddress: uuid.New(),
93+
ShardCount: 2,
9294
version: 1,
9395
},
9496
}
@@ -174,6 +176,7 @@ func (s *metadataSuite) Test_RefreshClusterMetadata_Success() {
174176
ClusterName: s.clusterName,
175177
IsConnectionEnabled: true,
176178
InitialFailoverVersion: 1,
179+
HistoryShardCount: 1,
177180
ClusterAddress: uuid.New(),
178181
},
179182
Version: 1,
@@ -183,6 +186,7 @@ func (s *metadataSuite) Test_RefreshClusterMetadata_Success() {
183186
ClusterName: id,
184187
IsConnectionEnabled: true,
185188
InitialFailoverVersion: 2,
189+
HistoryShardCount: 2,
186190
ClusterAddress: uuid.New(),
187191
},
188192
Version: 2,
@@ -207,6 +211,7 @@ func (s *metadataSuite) Test_ListAllClusterMetadataFromDB_Success() {
207211
ClusterName: s.clusterName,
208212
IsConnectionEnabled: true,
209213
InitialFailoverVersion: 1,
214+
HistoryShardCount: 1,
210215
ClusterAddress: uuid.New(),
211216
},
212217
Version: 1,
@@ -225,6 +230,7 @@ func (s *metadataSuite) Test_ListAllClusterMetadataFromDB_Success() {
225230
ClusterName: newClusterName,
226231
IsConnectionEnabled: true,
227232
InitialFailoverVersion: 2,
233+
HistoryShardCount: 2,
228234
ClusterAddress: uuid.New(),
229235
},
230236
Version: 2,

temporal/fx.go

+6
Original file line numberDiff line numberDiff line change
@@ -767,10 +767,16 @@ func loadClusterInformationFromStore(ctx context.Context, config *config.Config,
767767
return err
768768
}
769769
metadata := item.(*persistence.GetClusterMetadataResponse)
770+
shardCount := metadata.HistoryShardCount
771+
if shardCount == 0 {
772+
// This is to add backward compatibility to the config based cluster connection.
773+
shardCount = config.Persistence.NumHistoryShards
774+
}
770775
newMetadata := cluster.ClusterInformation{
771776
Enabled: metadata.IsConnectionEnabled,
772777
InitialFailoverVersion: metadata.InitialFailoverVersion,
773778
RPCAddress: metadata.ClusterAddress,
779+
ShardCount: shardCount,
774780
}
775781
if staticClusterMetadata, ok := config.ClusterMetadata.ClusterInformation[metadata.ClusterName]; ok {
776782
if metadata.ClusterName != config.ClusterMetadata.CurrentClusterName {

0 commit comments

Comments
 (0)